1use crate::codec::{
21 AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo, ResolvedField,
22 ResolvedRecord, ResolvedUnion, Tz,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36 DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37 Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use strum_macros::AsRefStr;
42use uuid::Uuid;
43
44use std::cmp::Ordering;
45use std::mem;
46use std::sync::Arc;
47
48const DEFAULT_CAPACITY: usize = 1024;
49
50macro_rules! decode_decimal {
52 ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
53 let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
54 $builder.append_value(<$Int>::from_be_bytes(bytes));
55 }};
56}
57
58macro_rules! flush_decimal {
60 ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
61 let (_, vals, _) = $builder.finish().into_parts();
62 let dec = <$ArrayTy>::try_new(vals, $nulls)?
63 .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
64 Arc::new(dec) as ArrayRef
65 }};
66}
67
68macro_rules! append_decimal_default {
71 ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
72 match $lit {
73 AvroLiteral::Bytes(b) => {
74 let ext = sign_cast_to::<$N>(b)?;
75 let val = <$Int>::from_be_bytes(ext);
76 $builder.append_value(val);
77 Ok(())
78 }
79 _ => Err(AvroError::InvalidArgument(
80 concat!(
81 "Default for ",
82 $name,
83 " must be bytes (two's-complement big-endian)"
84 )
85 .to_string(),
86 )),
87 }
88 }};
89}
90
91#[derive(Debug)]
93pub(crate) struct RecordDecoder {
94 schema: SchemaRef,
95 fields: Vec<Decoder>,
96 projector: Option<Projector>,
97 row_count: usize,
98}
99
100impl RecordDecoder {
101 pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
112 match data_type.codec() {
113 Codec::Struct(reader_fields) => {
114 let mut arrow_fields = Vec::with_capacity(reader_fields.len());
116 let mut encodings = Vec::with_capacity(reader_fields.len());
117 let mut field_defaults = Vec::with_capacity(reader_fields.len());
118 for avro_field in reader_fields.iter() {
119 arrow_fields.push(avro_field.field());
120 encodings.push(Decoder::try_new(avro_field.data_type())?);
121
122 if let Some(ResolutionInfo::DefaultValue(lit)) =
123 avro_field.data_type().resolution.as_ref()
124 {
125 field_defaults.push(Some(lit.clone()));
126 } else {
127 field_defaults.push(None);
128 }
129 }
130 let projector = match data_type.resolution.as_ref() {
131 Some(ResolutionInfo::Record(rec)) => {
132 Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
133 }
134 _ => None,
135 };
136 Ok(Self {
137 schema: Arc::new(ArrowSchema::new(arrow_fields)),
138 fields: encodings,
139 projector,
140 row_count: 0,
141 })
142 }
143 other => Err(AvroError::ParseError(format!(
144 "Expected record got {other:?}"
145 ))),
146 }
147 }
148
149 pub(crate) fn schema(&self) -> &SchemaRef {
151 &self.schema
152 }
153
154 pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
156 let mut cursor = AvroCursor::new(buf);
157 match self.projector.as_mut() {
158 Some(proj) => {
159 for _ in 0..count {
160 proj.project_record(&mut cursor, &mut self.fields)?;
161 }
162 }
163 None => {
164 for _ in 0..count {
165 for field in &mut self.fields {
166 field.decode(&mut cursor)?;
167 }
168 }
169 }
170 }
171 self.row_count += count;
172 Ok(cursor.position())
173 }
174
175 pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
177 let arrays = self
178 .fields
179 .iter_mut()
180 .map(|x| x.flush(None))
181 .collect::<Result<Vec<_>, _>>()?;
182 let batch_options = RecordBatchOptions::new().with_row_count(Some(self.row_count));
183 self.row_count = 0;
184 RecordBatch::try_new_with_options(self.schema.clone(), arrays, &batch_options)
185 .map_err(Into::into)
186 }
187}
188
189#[derive(Debug, AsRefStr)]
190enum Decoder {
191 Null(usize),
192 Boolean(BooleanBufferBuilder),
193 Int32(Vec<i32>),
194 Int64(Vec<i64>),
195 #[cfg(feature = "avro_custom_types")]
196 DurationSecond(Vec<i64>),
197 #[cfg(feature = "avro_custom_types")]
198 DurationMillisecond(Vec<i64>),
199 #[cfg(feature = "avro_custom_types")]
200 DurationMicrosecond(Vec<i64>),
201 #[cfg(feature = "avro_custom_types")]
202 DurationNanosecond(Vec<i64>),
203 #[cfg(feature = "avro_custom_types")]
204 Int8(Vec<i8>),
205 #[cfg(feature = "avro_custom_types")]
206 Int16(Vec<i16>),
207 #[cfg(feature = "avro_custom_types")]
208 UInt8(Vec<u8>),
209 #[cfg(feature = "avro_custom_types")]
210 UInt16(Vec<u16>),
211 #[cfg(feature = "avro_custom_types")]
212 UInt32(Vec<u32>),
213 #[cfg(feature = "avro_custom_types")]
214 UInt64(Vec<u64>),
215 #[cfg(feature = "avro_custom_types")]
216 Float16(Vec<u16>), #[cfg(feature = "avro_custom_types")]
218 Date64(Vec<i64>),
219 #[cfg(feature = "avro_custom_types")]
220 TimeNanos(Vec<i64>),
221 #[cfg(feature = "avro_custom_types")]
222 Time32Secs(Vec<i32>),
223 #[cfg(feature = "avro_custom_types")]
224 TimestampSecs(bool, Vec<i64>),
225 #[cfg(feature = "avro_custom_types")]
226 IntervalYearMonth(Vec<i32>),
227 #[cfg(feature = "avro_custom_types")]
228 IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
229 #[cfg(feature = "avro_custom_types")]
230 IntervalDayTime(Vec<IntervalDayTime>),
231 Float32(Vec<f32>),
232 Float64(Vec<f64>),
233 Date32(Vec<i32>),
234 TimeMillis(Vec<i32>),
235 TimeMicros(Vec<i64>),
236 TimestampMillis(Option<Tz>, Vec<i64>),
237 TimestampMicros(Option<Tz>, Vec<i64>),
238 TimestampNanos(Option<Tz>, Vec<i64>),
239 Int32ToInt64(Vec<i64>),
240 Int32ToFloat32(Vec<f32>),
241 Int32ToFloat64(Vec<f64>),
242 Int64ToFloat32(Vec<f32>),
243 Int64ToFloat64(Vec<f64>),
244 Float32ToFloat64(Vec<f64>),
245 BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
246 StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
247 Binary(OffsetBufferBuilder<i32>, Vec<u8>),
248 String(OffsetBufferBuilder<i32>, Vec<u8>),
250 StringView(OffsetBufferBuilder<i32>, Vec<u8>),
252 Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
253 Record(
254 Fields,
255 Vec<Decoder>,
256 Vec<Option<AvroLiteral>>,
257 Option<Projector>,
258 ),
259 Map(
260 FieldRef,
261 OffsetBufferBuilder<i32>,
262 OffsetBufferBuilder<i32>,
263 Vec<u8>,
264 Box<Decoder>,
265 ),
266 Fixed(i32, Vec<u8>),
267 Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
268 Duration(IntervalMonthDayNanoBuilder),
269 Uuid(Vec<u8>),
270 #[cfg(feature = "small_decimals")]
271 Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
272 #[cfg(feature = "small_decimals")]
273 Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
274 Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
275 Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
276 #[cfg(feature = "avro_custom_types")]
277 RunEndEncoded(u8, usize, Box<Decoder>),
278 Union(UnionDecoder),
279 Nullable(NullablePlan, NullBufferBuilder, Box<Decoder>),
280}
281
282impl Decoder {
283 fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
284 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
285 if info.writer_is_union && !info.reader_is_union {
286 let mut clone = data_type.clone();
287 clone.resolution = None; let target = Self::try_new_internal(&clone)?;
289 let decoder = Self::Union(
290 UnionDecoderBuilder::new()
291 .with_resolved_union(info.clone())
292 .with_target(target)
293 .build()?,
294 );
295 return Ok(decoder);
296 }
297 }
298 Self::try_new_internal(data_type)
299 }
300
301 fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
302 let promotion = match data_type.resolution.as_ref() {
304 Some(ResolutionInfo::Promotion(p)) => Some(*p),
305 _ => None,
306 };
307 let decoder = match (data_type.codec(), promotion) {
308 (Codec::Int64, Some(Promotion::IntToLong)) => {
309 Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
310 }
311 (Codec::Float32, Some(Promotion::IntToFloat)) => {
312 Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
313 }
314 (Codec::Float64, Some(Promotion::IntToDouble)) => {
315 Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
316 }
317 (Codec::Float32, Some(Promotion::LongToFloat)) => {
318 Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
319 }
320 (Codec::Float64, Some(Promotion::LongToDouble)) => {
321 Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
322 }
323 (Codec::Float64, Some(Promotion::FloatToDouble)) => {
324 Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
325 }
326 (Codec::Utf8, Some(Promotion::BytesToString))
327 | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
328 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
329 Vec::with_capacity(DEFAULT_CAPACITY),
330 ),
331 (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
332 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
333 Vec::with_capacity(DEFAULT_CAPACITY),
334 ),
335 (Codec::Null, _) => Self::Null(0),
336 (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
337 (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
338 (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
339 (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
340 (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
341 (Codec::Binary, _) => Self::Binary(
342 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
343 Vec::with_capacity(DEFAULT_CAPACITY),
344 ),
345 (Codec::Utf8, _) => Self::String(
346 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
347 Vec::with_capacity(DEFAULT_CAPACITY),
348 ),
349 (Codec::Utf8View, _) => Self::StringView(
350 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
351 Vec::with_capacity(DEFAULT_CAPACITY),
352 ),
353 (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
354 (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
355 (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
356 (Codec::TimestampMillis(tz), _) => {
357 Self::TimestampMillis(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
358 }
359 (Codec::TimestampMicros(tz), _) => {
360 Self::TimestampMicros(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
361 }
362 (Codec::TimestampNanos(tz), _) => {
363 Self::TimestampNanos(*tz, Vec::with_capacity(DEFAULT_CAPACITY))
364 }
365 #[cfg(feature = "avro_custom_types")]
366 (Codec::DurationNanos, _) => {
367 Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
368 }
369 #[cfg(feature = "avro_custom_types")]
370 (Codec::DurationMicros, _) => {
371 Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
372 }
373 #[cfg(feature = "avro_custom_types")]
374 (Codec::DurationMillis, _) => {
375 Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
376 }
377 #[cfg(feature = "avro_custom_types")]
378 (Codec::DurationSeconds, _) => {
379 Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
380 }
381 #[cfg(feature = "avro_custom_types")]
382 (Codec::Int8, _) => Self::Int8(Vec::with_capacity(DEFAULT_CAPACITY)),
383 #[cfg(feature = "avro_custom_types")]
384 (Codec::Int16, _) => Self::Int16(Vec::with_capacity(DEFAULT_CAPACITY)),
385 #[cfg(feature = "avro_custom_types")]
386 (Codec::UInt8, _) => Self::UInt8(Vec::with_capacity(DEFAULT_CAPACITY)),
387 #[cfg(feature = "avro_custom_types")]
388 (Codec::UInt16, _) => Self::UInt16(Vec::with_capacity(DEFAULT_CAPACITY)),
389 #[cfg(feature = "avro_custom_types")]
390 (Codec::UInt32, _) => Self::UInt32(Vec::with_capacity(DEFAULT_CAPACITY)),
391 #[cfg(feature = "avro_custom_types")]
392 (Codec::UInt64, _) => Self::UInt64(Vec::with_capacity(DEFAULT_CAPACITY)),
393 #[cfg(feature = "avro_custom_types")]
394 (Codec::Float16, _) => Self::Float16(Vec::with_capacity(DEFAULT_CAPACITY)),
395 #[cfg(feature = "avro_custom_types")]
396 (Codec::Date64, _) => Self::Date64(Vec::with_capacity(DEFAULT_CAPACITY)),
397 #[cfg(feature = "avro_custom_types")]
398 (Codec::TimeNanos, _) => Self::TimeNanos(Vec::with_capacity(DEFAULT_CAPACITY)),
399 #[cfg(feature = "avro_custom_types")]
400 (Codec::Time32Secs, _) => Self::Time32Secs(Vec::with_capacity(DEFAULT_CAPACITY)),
401 #[cfg(feature = "avro_custom_types")]
402 (Codec::TimestampSecs(is_utc), _) => {
403 Self::TimestampSecs(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
404 }
405 #[cfg(feature = "avro_custom_types")]
406 (Codec::IntervalYearMonth, _) => {
407 Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
408 }
409 #[cfg(feature = "avro_custom_types")]
410 (Codec::IntervalMonthDayNano, _) => {
411 Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
412 }
413 #[cfg(feature = "avro_custom_types")]
414 (Codec::IntervalDayTime, _) => {
415 Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
416 }
417 (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
418 (Codec::Decimal(precision, scale, size), _) => {
419 let p = *precision;
420 let s = *scale;
421 let prec = p as u8;
422 let scl = s.unwrap_or(0) as i8;
423 #[cfg(feature = "small_decimals")]
424 {
425 if p <= DECIMAL32_MAX_PRECISION as usize {
426 let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
427 .with_precision_and_scale(prec, scl)?;
428 Self::Decimal32(p, s, *size, builder)
429 } else if p <= DECIMAL64_MAX_PRECISION as usize {
430 let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
431 .with_precision_and_scale(prec, scl)?;
432 Self::Decimal64(p, s, *size, builder)
433 } else if p <= DECIMAL128_MAX_PRECISION as usize {
434 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
435 .with_precision_and_scale(prec, scl)?;
436 Self::Decimal128(p, s, *size, builder)
437 } else if p <= DECIMAL256_MAX_PRECISION as usize {
438 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
439 .with_precision_and_scale(prec, scl)?;
440 Self::Decimal256(p, s, *size, builder)
441 } else {
442 return Err(AvroError::ParseError(format!(
443 "Decimal precision {p} exceeds maximum supported"
444 )));
445 }
446 }
447 #[cfg(not(feature = "small_decimals"))]
448 {
449 if p <= DECIMAL128_MAX_PRECISION as usize {
450 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
451 .with_precision_and_scale(prec, scl)?;
452 Self::Decimal128(p, s, *size, builder)
453 } else if p <= DECIMAL256_MAX_PRECISION as usize {
454 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
455 .with_precision_and_scale(prec, scl)?;
456 Self::Decimal256(p, s, *size, builder)
457 } else {
458 return Err(AvroError::ParseError(format!(
459 "Decimal precision {p} exceeds maximum supported"
460 )));
461 }
462 }
463 }
464 (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
465 (Codec::List(item), _) => {
466 let decoder = Self::try_new(item)?;
467 Self::Array(
468 Arc::new(item.field_with_name("item")),
469 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
470 Box::new(decoder),
471 )
472 }
473 (Codec::Enum(symbols), _) => {
474 let res = match data_type.resolution.as_ref() {
475 Some(ResolutionInfo::EnumMapping(mapping)) => {
476 Some(EnumResolution::new(mapping))
477 }
478 _ => None,
479 };
480 Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
481 }
482 (Codec::Struct(fields), _) => {
483 let mut arrow_fields = Vec::with_capacity(fields.len());
484 let mut encodings = Vec::with_capacity(fields.len());
485 let mut field_defaults = Vec::with_capacity(fields.len());
486 for avro_field in fields.iter() {
487 let encoding = Self::try_new(avro_field.data_type())?;
488 arrow_fields.push(avro_field.field());
489 encodings.push(encoding);
490
491 if let Some(ResolutionInfo::DefaultValue(lit)) =
492 avro_field.data_type().resolution.as_ref()
493 {
494 field_defaults.push(Some(lit.clone()));
495 } else {
496 field_defaults.push(None);
497 }
498 }
499 let projector =
500 if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
501 Some(ProjectorBuilder::try_new(rec, &field_defaults).build()?)
502 } else {
503 None
504 };
505 Self::Record(arrow_fields.into(), encodings, field_defaults, projector)
506 }
507 (Codec::Map(child), _) => {
508 let val_field = child.field_with_name("value");
509 let map_field = Arc::new(ArrowField::new(
510 "entries",
511 DataType::Struct(Fields::from(vec![
512 ArrowField::new("key", DataType::Utf8, false),
513 val_field,
514 ])),
515 false,
516 ));
517 let val_dec = Self::try_new(child)?;
518 Self::Map(
519 map_field,
520 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
521 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
522 Vec::with_capacity(DEFAULT_CAPACITY),
523 Box::new(val_dec),
524 )
525 }
526 (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
527 (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
528 let decoders = encodings
529 .iter()
530 .map(Self::try_new_internal)
531 .collect::<Result<Vec<_>, _>>()?;
532 if fields.len() != decoders.len() {
533 return Err(AvroError::SchemaError(format!(
534 "Union has {} fields but {} decoders",
535 fields.len(),
536 decoders.len()
537 )));
538 }
539 let branch_count = decoders.len();
542 let max_addr = (i32::MAX as usize) + 1;
543 if branch_count > max_addr {
544 return Err(AvroError::SchemaError(format!(
545 "Union has {branch_count} branches, which exceeds the maximum addressable \
546 branches by an Avro int tag ({} + 1).",
547 i32::MAX
548 )));
549 }
550 let mut builder = UnionDecoderBuilder::new()
551 .with_fields(fields.clone())
552 .with_branches(decoders);
553 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
554 if info.reader_is_union {
555 builder = builder.with_resolved_union(info.clone());
556 }
557 }
558 Self::Union(builder.build()?)
559 }
560 (Codec::Union(_, _, _), _) => {
561 return Err(AvroError::NYI(
562 "Sparse Arrow unions are not yet supported".to_string(),
563 ));
564 }
565 #[cfg(feature = "avro_custom_types")]
566 (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
567 let inner = Self::try_new(values_dt)?;
568 let byte_width: u8 = match *width_bits_or_bytes {
569 2 | 4 | 8 => *width_bits_or_bytes,
570 16 => 2,
571 32 => 4,
572 64 => 8,
573 other => {
574 return Err(AvroError::InvalidArgument(format!(
575 "Unsupported run-end width {other} for RunEndEncoded; \
576 expected 16/32/64 bits or 2/4/8 bytes"
577 )));
578 }
579 };
580 Self::RunEndEncoded(byte_width, 0, Box::new(inner))
581 }
582 };
583 Ok(match data_type.nullability() {
584 Some(nullability) => {
585 let plan = match &data_type.resolution {
587 None => NullablePlan::ReadTag {
588 nullability,
589 resolution: ResolutionPlan::Promotion(Promotion::Direct),
590 },
591 Some(ResolutionInfo::Promotion(_)) => {
592 NullablePlan::FromSingle {
595 resolution: ResolutionPlan::Promotion(Promotion::Direct),
596 }
597 }
598 Some(ResolutionInfo::Union(info)) if !info.writer_is_union => {
599 let Some(Some((_, resolution))) = info.writer_to_reader.first() else {
600 return Err(AvroError::SchemaError(
601 "unexpected union resolution info for non-union writer and union reader type".into(),
602 ));
603 };
604 let resolution = ResolutionPlan::try_new(&decoder, resolution)?;
605 NullablePlan::FromSingle { resolution }
606 }
607 Some(ResolutionInfo::Union(info)) => {
608 let Some((_, resolution)) =
609 info.writer_to_reader[nullability.non_null_index()].as_ref()
610 else {
611 return Err(AvroError::SchemaError(
612 "unexpected union resolution info for nullable writer type".into(),
613 ));
614 };
615 NullablePlan::ReadTag {
616 nullability,
617 resolution: ResolutionPlan::try_new(&decoder, resolution)?,
618 }
619 }
620 Some(resolution) => NullablePlan::FromSingle {
621 resolution: ResolutionPlan::try_new(&decoder, resolution)?,
622 },
623 };
624 Self::Nullable(
625 plan,
626 NullBufferBuilder::new(DEFAULT_CAPACITY),
627 Box::new(decoder),
628 )
629 }
630 None => decoder,
631 })
632 }
633
634 fn append_null(&mut self) -> Result<(), AvroError> {
636 match self {
637 Self::Null(count) => *count += 1,
638 Self::Boolean(b) => b.append(false),
639 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
640 Self::Int64(v)
641 | Self::Int32ToInt64(v)
642 | Self::TimeMicros(v)
643 | Self::TimestampMillis(_, v)
644 | Self::TimestampMicros(_, v)
645 | Self::TimestampNanos(_, v) => v.push(0),
646 #[cfg(feature = "avro_custom_types")]
647 Self::DurationSecond(v)
648 | Self::DurationMillisecond(v)
649 | Self::DurationMicrosecond(v)
650 | Self::DurationNanosecond(v) => v.push(0),
651 #[cfg(feature = "avro_custom_types")]
652 Self::Int8(v) => v.push(0),
653 #[cfg(feature = "avro_custom_types")]
654 Self::Int16(v) => v.push(0),
655 #[cfg(feature = "avro_custom_types")]
656 Self::UInt8(v) => v.push(0),
657 #[cfg(feature = "avro_custom_types")]
658 Self::UInt16(v) => v.push(0),
659 #[cfg(feature = "avro_custom_types")]
660 Self::UInt32(v) => v.push(0),
661 #[cfg(feature = "avro_custom_types")]
662 Self::UInt64(v) => v.push(0),
663 #[cfg(feature = "avro_custom_types")]
664 Self::Float16(v) => v.push(0),
665 #[cfg(feature = "avro_custom_types")]
666 Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => v.push(0),
667 #[cfg(feature = "avro_custom_types")]
668 Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
669 #[cfg(feature = "avro_custom_types")]
670 Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
671 #[cfg(feature = "avro_custom_types")]
672 Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
673 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
674 Self::Float64(v)
675 | Self::Int32ToFloat64(v)
676 | Self::Int64ToFloat64(v)
677 | Self::Float32ToFloat64(v) => v.push(0.),
678 Self::Binary(offsets, _)
679 | Self::String(offsets, _)
680 | Self::StringView(offsets, _)
681 | Self::BytesToString(offsets, _)
682 | Self::StringToBytes(offsets, _) => {
683 offsets.push_length(0);
684 }
685 Self::Uuid(v) => {
686 v.extend([0; 16]);
687 }
688 Self::Array(_, offsets, _) => {
689 offsets.push_length(0);
690 }
691 Self::Record(_, e, _, _) => {
692 for encoding in e.iter_mut() {
693 encoding.append_null()?;
694 }
695 }
696 Self::Map(_, _koff, moff, _, _) => {
697 moff.push_length(0);
698 }
699 Self::Fixed(sz, accum) => {
700 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
701 }
702 #[cfg(feature = "small_decimals")]
703 Self::Decimal32(_, _, _, builder) => builder.append_value(0),
704 #[cfg(feature = "small_decimals")]
705 Self::Decimal64(_, _, _, builder) => builder.append_value(0),
706 Self::Decimal128(_, _, _, builder) => builder.append_value(0),
707 Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
708 Self::Enum(indices, _, _) => indices.push(0),
709 Self::Duration(builder) => builder.append_null(),
710 #[cfg(feature = "avro_custom_types")]
711 Self::RunEndEncoded(_, len, inner) => {
712 *len += 1;
713 inner.append_null()?;
714 }
715 Self::Union(u) => u.append_null()?,
716 Self::Nullable(_, null_buffer, inner) => {
717 null_buffer.append(false);
718 inner.append_null()?;
719 }
720 }
721 Ok(())
722 }
723
724 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
726 match self {
727 Self::Nullable(_, nb, inner) => {
728 if matches!(lit, AvroLiteral::Null) {
729 nb.append(false);
730 inner.append_null()
731 } else {
732 nb.append(true);
733 inner.append_default(lit)
734 }
735 }
736 Self::Null(count) => match lit {
737 AvroLiteral::Null => {
738 *count += 1;
739 Ok(())
740 }
741 _ => Err(AvroError::InvalidArgument(
742 "Non-null default for null type".to_string(),
743 )),
744 },
745 Self::Boolean(b) => match lit {
746 AvroLiteral::Boolean(v) => {
747 b.append(*v);
748 Ok(())
749 }
750 _ => Err(AvroError::InvalidArgument(
751 "Default for boolean must be boolean".to_string(),
752 )),
753 },
754 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
755 AvroLiteral::Int(i) => {
756 v.push(*i);
757 Ok(())
758 }
759 _ => Err(AvroError::InvalidArgument(
760 "Default for int32/date32/time-millis must be int".to_string(),
761 )),
762 },
763 #[cfg(feature = "avro_custom_types")]
764 Self::DurationSecond(v)
765 | Self::DurationMillisecond(v)
766 | Self::DurationMicrosecond(v)
767 | Self::DurationNanosecond(v) => match lit {
768 AvroLiteral::Long(i) => {
769 v.push(*i);
770 Ok(())
771 }
772 _ => Err(AvroError::InvalidArgument(
773 "Default for duration long must be long".to_string(),
774 )),
775 },
776 #[cfg(feature = "avro_custom_types")]
777 Self::Int8(v) => match lit {
778 AvroLiteral::Int(i) => {
779 let x = i8::try_from(*i).map_err(|_| {
780 AvroError::InvalidArgument(format!(
781 "Default for int8 out of range for i8: {i}"
782 ))
783 })?;
784 v.push(x);
785 Ok(())
786 }
787 _ => Err(AvroError::InvalidArgument(
788 "Default for int8 must be int".to_string(),
789 )),
790 },
791 #[cfg(feature = "avro_custom_types")]
792 Self::Int16(v) => match lit {
793 AvroLiteral::Int(i) => {
794 let x = i16::try_from(*i).map_err(|_| {
795 AvroError::InvalidArgument(format!(
796 "Default for int16 out of range for i16: {i}"
797 ))
798 })?;
799 v.push(x);
800 Ok(())
801 }
802 _ => Err(AvroError::InvalidArgument(
803 "Default for int16 must be int".to_string(),
804 )),
805 },
806 #[cfg(feature = "avro_custom_types")]
807 Self::UInt8(v) => match lit {
808 AvroLiteral::Int(i) => {
809 let x = u8::try_from(*i).map_err(|_| {
810 AvroError::InvalidArgument(format!(
811 "Default for uint8 out of range for u8: {i}"
812 ))
813 })?;
814 v.push(x);
815 Ok(())
816 }
817 _ => Err(AvroError::InvalidArgument(
818 "Default for uint8 must be int".to_string(),
819 )),
820 },
821 #[cfg(feature = "avro_custom_types")]
822 Self::UInt16(v) => match lit {
823 AvroLiteral::Int(i) => {
824 let x = u16::try_from(*i).map_err(|_| {
825 AvroError::InvalidArgument(format!(
826 "Default for uint16 out of range for u16: {i}"
827 ))
828 })?;
829 v.push(x);
830 Ok(())
831 }
832 _ => Err(AvroError::InvalidArgument(
833 "Default for uint16 must be int".to_string(),
834 )),
835 },
836 #[cfg(feature = "avro_custom_types")]
837 Self::UInt32(v) => match lit {
838 AvroLiteral::Long(i) => {
839 let x = u32::try_from(*i).map_err(|_| {
840 AvroError::InvalidArgument(format!(
841 "Default for uint32 out of range for u32: {i}"
842 ))
843 })?;
844 v.push(x);
845 Ok(())
846 }
847 _ => Err(AvroError::InvalidArgument(
848 "Default for uint32 must be long".to_string(),
849 )),
850 },
851 #[cfg(feature = "avro_custom_types")]
852 Self::UInt64(v) => match lit {
853 AvroLiteral::Bytes(b) => {
854 if b.len() != 8 {
855 return Err(AvroError::InvalidArgument(format!(
856 "uint64 default must be exactly 8 bytes, got {}",
857 b.len()
858 )));
859 }
860 v.push(u64::from_le_bytes([
861 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
862 ]));
863 Ok(())
864 }
865 _ => Err(AvroError::InvalidArgument(
866 "Default for uint64 must be bytes (8-byte LE)".to_string(),
867 )),
868 },
869 #[cfg(feature = "avro_custom_types")]
870 Self::Float16(v) => match lit {
871 AvroLiteral::Bytes(b) => {
872 if b.len() != 2 {
873 return Err(AvroError::InvalidArgument(format!(
874 "float16 default must be exactly 2 bytes, got {}",
875 b.len()
876 )));
877 }
878 v.push(u16::from_le_bytes([b[0], b[1]]));
879 Ok(())
880 }
881 _ => Err(AvroError::InvalidArgument(
882 "Default for float16 must be bytes (2-byte LE IEEE-754)".to_string(),
883 )),
884 },
885 #[cfg(feature = "avro_custom_types")]
886 Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => match lit {
887 AvroLiteral::Long(i) => {
888 v.push(*i);
889 Ok(())
890 }
891 _ => Err(AvroError::InvalidArgument(
892 "Default for date64/time-nanos/timestamp-secs must be long".to_string(),
893 )),
894 },
895 #[cfg(feature = "avro_custom_types")]
896 Self::Time32Secs(v) => match lit {
897 AvroLiteral::Int(i) => {
898 v.push(*i);
899 Ok(())
900 }
901 _ => Err(AvroError::InvalidArgument(
902 "Default for time32-secs must be int".to_string(),
903 )),
904 },
905 #[cfg(feature = "avro_custom_types")]
906 Self::IntervalYearMonth(v) => match lit {
907 AvroLiteral::Bytes(b) => {
908 if b.len() != 4 {
909 return Err(AvroError::InvalidArgument(format!(
910 "interval-year-month default must be exactly 4 bytes, got {}",
911 b.len()
912 )));
913 }
914 v.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
915 Ok(())
916 }
917 _ => Err(AvroError::InvalidArgument(
918 "Default for interval-year-month must be bytes (4-byte LE)".to_string(),
919 )),
920 },
921 #[cfg(feature = "avro_custom_types")]
922 Self::IntervalMonthDayNano(v) => match lit {
923 AvroLiteral::Bytes(b) => {
924 if b.len() != 16 {
925 return Err(AvroError::InvalidArgument(format!(
926 "interval-month-day-nano default must be exactly 16 bytes, got {}",
927 b.len()
928 )));
929 }
930 let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
931 let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
932 let nanos =
933 i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
934 v.push(IntervalMonthDayNano::new(months, days, nanos));
935 Ok(())
936 }
937 _ => Err(AvroError::InvalidArgument(
938 "Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
939 )),
940 },
941 #[cfg(feature = "avro_custom_types")]
942 Self::IntervalDayTime(v) => match lit {
943 AvroLiteral::Bytes(b) => {
944 if b.len() != 8 {
945 return Err(AvroError::InvalidArgument(format!(
946 "interval-day-time default must be exactly 8 bytes, got {}",
947 b.len()
948 )));
949 }
950 let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
951 let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
952 v.push(IntervalDayTime::new(days, milliseconds));
953 Ok(())
954 }
955 _ => Err(AvroError::InvalidArgument(
956 "Default for interval-day-time must be bytes (8-byte LE)".to_string(),
957 )),
958 },
959 Self::Int64(v)
960 | Self::Int32ToInt64(v)
961 | Self::TimeMicros(v)
962 | Self::TimestampMillis(_, v)
963 | Self::TimestampMicros(_, v)
964 | Self::TimestampNanos(_, v) => match lit {
965 AvroLiteral::Long(i) => {
966 v.push(*i);
967 Ok(())
968 }
969 AvroLiteral::Int(i) => {
970 v.push(*i as i64);
971 Ok(())
972 }
973 _ => Err(AvroError::InvalidArgument(
974 "Default for long/time-micros/timestamp must be long or int".to_string(),
975 )),
976 },
977 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
978 AvroLiteral::Float(f) => {
979 v.push(*f);
980 Ok(())
981 }
982 _ => Err(AvroError::InvalidArgument(
983 "Default for float must be float".to_string(),
984 )),
985 },
986 Self::Float64(v)
987 | Self::Int32ToFloat64(v)
988 | Self::Int64ToFloat64(v)
989 | Self::Float32ToFloat64(v) => match lit {
990 AvroLiteral::Double(f) => {
991 v.push(*f);
992 Ok(())
993 }
994 _ => Err(AvroError::InvalidArgument(
995 "Default for double must be double".to_string(),
996 )),
997 },
998 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
999 AvroLiteral::Bytes(b) => {
1000 offsets.push_length(b.len());
1001 values.extend_from_slice(b);
1002 Ok(())
1003 }
1004 _ => Err(AvroError::InvalidArgument(
1005 "Default for bytes must be bytes".to_string(),
1006 )),
1007 },
1008 Self::BytesToString(offsets, values)
1009 | Self::String(offsets, values)
1010 | Self::StringView(offsets, values) => match lit {
1011 AvroLiteral::String(s) => {
1012 let b = s.as_bytes();
1013 offsets.push_length(b.len());
1014 values.extend_from_slice(b);
1015 Ok(())
1016 }
1017 _ => Err(AvroError::InvalidArgument(
1018 "Default for string must be string".to_string(),
1019 )),
1020 },
1021 Self::Uuid(values) => match lit {
1022 AvroLiteral::String(s) => {
1023 let uuid = Uuid::try_parse(s).map_err(|e| {
1024 AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
1025 })?;
1026 values.extend_from_slice(uuid.as_bytes());
1027 Ok(())
1028 }
1029 _ => Err(AvroError::InvalidArgument(
1030 "Default for uuid must be string".to_string(),
1031 )),
1032 },
1033 Self::Fixed(sz, accum) => match lit {
1034 AvroLiteral::Bytes(b) => {
1035 if b.len() != *sz as usize {
1036 return Err(AvroError::InvalidArgument(format!(
1037 "Fixed default length {} does not match size {sz}",
1038 b.len(),
1039 )));
1040 }
1041 accum.extend_from_slice(b);
1042 Ok(())
1043 }
1044 _ => Err(AvroError::InvalidArgument(
1045 "Default for fixed must be bytes".to_string(),
1046 )),
1047 },
1048 #[cfg(feature = "small_decimals")]
1049 Self::Decimal32(_, _, _, builder) => {
1050 append_decimal_default!(lit, builder, 4, i32, "decimal32")
1051 }
1052 #[cfg(feature = "small_decimals")]
1053 Self::Decimal64(_, _, _, builder) => {
1054 append_decimal_default!(lit, builder, 8, i64, "decimal64")
1055 }
1056 Self::Decimal128(_, _, _, builder) => {
1057 append_decimal_default!(lit, builder, 16, i128, "decimal128")
1058 }
1059 Self::Decimal256(_, _, _, builder) => {
1060 append_decimal_default!(lit, builder, 32, i256, "decimal256")
1061 }
1062 Self::Duration(builder) => match lit {
1063 AvroLiteral::Bytes(b) => {
1064 if b.len() != 12 {
1065 return Err(AvroError::InvalidArgument(format!(
1066 "Duration default must be exactly 12 bytes, got {}",
1067 b.len()
1068 )));
1069 }
1070 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1071 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1072 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1073 let nanos = (millis as i64) * 1_000_000;
1074 builder.append_value(IntervalMonthDayNano::new(
1075 months as i32,
1076 days as i32,
1077 nanos,
1078 ));
1079 Ok(())
1080 }
1081 _ => Err(AvroError::InvalidArgument(
1082 "Default for duration must be 12-byte little-endian months/days/millis"
1083 .to_string(),
1084 )),
1085 },
1086 Self::Array(_, offsets, inner) => match lit {
1087 AvroLiteral::Array(items) => {
1088 offsets.push_length(items.len());
1089 for item in items {
1090 inner.append_default(item)?;
1091 }
1092 Ok(())
1093 }
1094 _ => Err(AvroError::InvalidArgument(
1095 "Default for array must be an array literal".to_string(),
1096 )),
1097 },
1098 Self::Map(_, koff, moff, kdata, valdec) => match lit {
1099 AvroLiteral::Map(entries) => {
1100 moff.push_length(entries.len());
1101 for (k, v) in entries {
1102 let kb = k.as_bytes();
1103 koff.push_length(kb.len());
1104 kdata.extend_from_slice(kb);
1105 valdec.append_default(v)?;
1106 }
1107 Ok(())
1108 }
1109 _ => Err(AvroError::InvalidArgument(
1110 "Default for map must be a map/object literal".to_string(),
1111 )),
1112 },
1113 Self::Enum(indices, symbols, _) => match lit {
1114 AvroLiteral::Enum(sym) => {
1115 let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
1116 AvroError::InvalidArgument(format!(
1117 "Enum default symbol {sym:?} not in reader symbols"
1118 ))
1119 })?;
1120 indices.push(pos as i32);
1121 Ok(())
1122 }
1123 _ => Err(AvroError::InvalidArgument(
1124 "Default for enum must be a symbol".to_string(),
1125 )),
1126 },
1127 #[cfg(feature = "avro_custom_types")]
1128 Self::RunEndEncoded(_, len, inner) => {
1129 *len += 1;
1130 inner.append_default(lit)
1131 }
1132 Self::Union(u) => u.append_default(lit),
1133 Self::Record(field_meta, decoders, field_defaults, _) => match lit {
1134 AvroLiteral::Map(entries) => {
1135 for (i, dec) in decoders.iter_mut().enumerate() {
1136 let name = field_meta[i].name();
1137 if let Some(sub) = entries.get(name) {
1138 dec.append_default(sub)?;
1139 } else if let Some(default_literal) = field_defaults[i].as_ref() {
1140 dec.append_default(default_literal)?;
1141 } else {
1142 dec.append_null()?;
1143 }
1144 }
1145 Ok(())
1146 }
1147 AvroLiteral::Null => {
1148 for (i, dec) in decoders.iter_mut().enumerate() {
1149 if let Some(default_literal) = field_defaults[i].as_ref() {
1150 dec.append_default(default_literal)?;
1151 } else {
1152 dec.append_null()?;
1153 }
1154 }
1155 Ok(())
1156 }
1157 _ => Err(AvroError::InvalidArgument(
1158 "Default for record must be a map/object or null".to_string(),
1159 )),
1160 },
1161 }
1162 }
1163
1164 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1166 match self {
1167 Self::Null(x) => *x += 1,
1168 Self::Boolean(values) => values.append(buf.get_bool()?),
1169 Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
1170 values.push(buf.get_int()?)
1171 }
1172 Self::Int64(values)
1173 | Self::TimeMicros(values)
1174 | Self::TimestampMillis(_, values)
1175 | Self::TimestampMicros(_, values)
1176 | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
1177 #[cfg(feature = "avro_custom_types")]
1178 Self::DurationSecond(values)
1179 | Self::DurationMillisecond(values)
1180 | Self::DurationMicrosecond(values)
1181 | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
1182 #[cfg(feature = "avro_custom_types")]
1183 Self::Int8(values) => {
1184 let raw = buf.get_int()?;
1185 let x = i8::try_from(raw).map_err(|_| {
1186 AvroError::ParseError(format!("int8 value {raw} out of range for i8"))
1187 })?;
1188 values.push(x);
1189 }
1190 #[cfg(feature = "avro_custom_types")]
1191 Self::Int16(values) => {
1192 let raw = buf.get_int()?;
1193 let x = i16::try_from(raw).map_err(|_| {
1194 AvroError::ParseError(format!("int16 value {raw} out of range for i16"))
1195 })?;
1196 values.push(x);
1197 }
1198 #[cfg(feature = "avro_custom_types")]
1199 Self::UInt8(values) => {
1200 let raw = buf.get_int()?;
1201 let x = u8::try_from(raw).map_err(|_| {
1202 AvroError::ParseError(format!("uint8 value {raw} out of range for u8"))
1203 })?;
1204 values.push(x);
1205 }
1206 #[cfg(feature = "avro_custom_types")]
1207 Self::UInt16(values) => {
1208 let raw = buf.get_int()?;
1209 let x = u16::try_from(raw).map_err(|_| {
1210 AvroError::ParseError(format!("uint16 value {raw} out of range for u16"))
1211 })?;
1212 values.push(x);
1213 }
1214 #[cfg(feature = "avro_custom_types")]
1215 Self::UInt32(values) => {
1216 let raw = buf.get_long()?;
1217 let x = u32::try_from(raw).map_err(|_| {
1218 AvroError::ParseError(format!("uint32 value {raw} out of range for u32"))
1219 })?;
1220 values.push(x);
1221 }
1222 #[cfg(feature = "avro_custom_types")]
1223 Self::UInt64(values) => {
1224 let b = buf.get_fixed(8)?;
1225 values.push(u64::from_le_bytes([
1226 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
1227 ]));
1228 }
1229 #[cfg(feature = "avro_custom_types")]
1230 Self::Float16(values) => {
1231 let b = buf.get_fixed(2)?;
1232 values.push(u16::from_le_bytes([b[0], b[1]]));
1233 }
1234 #[cfg(feature = "avro_custom_types")]
1235 Self::Date64(values) | Self::TimeNanos(values) | Self::TimestampSecs(_, values) => {
1236 values.push(buf.get_long()?)
1237 }
1238 #[cfg(feature = "avro_custom_types")]
1239 Self::Time32Secs(values) => values.push(buf.get_int()?),
1240 #[cfg(feature = "avro_custom_types")]
1241 Self::IntervalYearMonth(values) => {
1242 let b = buf.get_fixed(4)?;
1243 values.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
1244 }
1245 #[cfg(feature = "avro_custom_types")]
1246 Self::IntervalMonthDayNano(values) => {
1247 let b = buf.get_fixed(16)?;
1248 let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1249 let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1250 let nanos =
1251 i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
1252 values.push(IntervalMonthDayNano::new(months, days, nanos));
1253 }
1254 #[cfg(feature = "avro_custom_types")]
1255 Self::IntervalDayTime(values) => {
1256 let b = buf.get_fixed(8)?;
1257 let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1259 let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1260 values.push(IntervalDayTime::new(days, milliseconds));
1261 }
1262 Self::Float32(values) => values.push(buf.get_float()?),
1263 Self::Float64(values) => values.push(buf.get_double()?),
1264 Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
1265 Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
1266 Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
1267 Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
1268 Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
1269 Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
1270 Self::StringToBytes(offsets, values)
1271 | Self::BytesToString(offsets, values)
1272 | Self::Binary(offsets, values)
1273 | Self::String(offsets, values)
1274 | Self::StringView(offsets, values) => {
1275 let data = buf.get_bytes()?;
1276 offsets.push_length(data.len());
1277 values.extend_from_slice(data);
1278 }
1279 Self::Uuid(values) => {
1280 let s_bytes = buf.get_bytes()?;
1281 let s = std::str::from_utf8(s_bytes).map_err(|e| {
1282 AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
1283 })?;
1284 let uuid = Uuid::try_parse(s)
1285 .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
1286 values.extend_from_slice(uuid.as_bytes());
1287 }
1288 Self::Array(_, off, encoding) => {
1289 let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
1290 off.push_length(total_items);
1291 }
1292 Self::Record(_, encodings, _, None) => {
1293 for encoding in encodings {
1294 encoding.decode(buf)?;
1295 }
1296 }
1297 Self::Record(_, encodings, _, Some(proj)) => {
1298 proj.project_record(buf, encodings)?;
1299 }
1300 Self::Map(_, koff, moff, kdata, valdec) => {
1301 let newly_added = read_blocks(buf, |cur| {
1302 let kb = cur.get_bytes()?;
1303 koff.push_length(kb.len());
1304 kdata.extend_from_slice(kb);
1305 valdec.decode(cur)
1306 })?;
1307 moff.push_length(newly_added);
1308 }
1309 Self::Fixed(sz, accum) => {
1310 let fx = buf.get_fixed(*sz as usize)?;
1311 accum.extend_from_slice(fx);
1312 }
1313 #[cfg(feature = "small_decimals")]
1314 Self::Decimal32(_, _, size, builder) => {
1315 decode_decimal!(size, buf, builder, 4, i32);
1316 }
1317 #[cfg(feature = "small_decimals")]
1318 Self::Decimal64(_, _, size, builder) => {
1319 decode_decimal!(size, buf, builder, 8, i64);
1320 }
1321 Self::Decimal128(_, _, size, builder) => {
1322 decode_decimal!(size, buf, builder, 16, i128);
1323 }
1324 Self::Decimal256(_, _, size, builder) => {
1325 decode_decimal!(size, buf, builder, 32, i256);
1326 }
1327 Self::Enum(indices, _, None) => {
1328 indices.push(buf.get_int()?);
1329 }
1330 Self::Enum(indices, _, Some(res)) => {
1331 let raw = buf.get_int()?;
1332 let resolved = res.resolve(raw)?;
1333 indices.push(resolved);
1334 }
1335 Self::Duration(builder) => {
1336 let b = buf.get_fixed(12)?;
1337 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1338 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1339 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1340 let nanos = (millis as i64) * 1_000_000;
1341 builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
1342 }
1343 #[cfg(feature = "avro_custom_types")]
1344 Self::RunEndEncoded(_, len, inner) => {
1345 *len += 1;
1346 inner.decode(buf)?;
1347 }
1348 Self::Union(u) => u.decode(buf)?,
1349 Self::Nullable(plan, nb, encoding) => {
1350 match plan {
1351 NullablePlan::FromSingle { resolution } => {
1352 encoding.decode_with_resolution(buf, resolution)?;
1353 nb.append(true);
1354 }
1355 NullablePlan::ReadTag {
1356 nullability,
1357 resolution,
1358 } => {
1359 let branch = buf.read_vlq()?;
1360 let is_not_null = match *nullability {
1361 Nullability::NullFirst => branch != 0,
1362 Nullability::NullSecond => branch == 0,
1363 };
1364 if is_not_null {
1365 encoding.decode_with_resolution(buf, resolution)?;
1367 } else {
1368 encoding.append_null()?;
1369 }
1370 nb.append(is_not_null);
1371 }
1372 }
1373 }
1374 }
1375 Ok(())
1376 }
1377
1378 fn decode_with_promotion(
1379 &mut self,
1380 buf: &mut AvroCursor<'_>,
1381 promotion: Promotion,
1382 ) -> Result<(), AvroError> {
1383 #[cfg(feature = "avro_custom_types")]
1384 if let Self::RunEndEncoded(_, len, inner) = self {
1385 *len += 1;
1386 return inner.decode_with_promotion(buf, promotion);
1387 }
1388
1389 macro_rules! promote_numeric_to {
1390 ($variant:ident, $getter:ident, $to:ty) => {{
1391 match self {
1392 Self::$variant(v) => {
1393 let x = buf.$getter()?;
1394 v.push(x as $to);
1395 Ok(())
1396 }
1397 other => Err(AvroError::ParseError(format!(
1398 "Promotion {promotion} target mismatch: expected {}, got {}",
1399 stringify!($variant),
1400 <Self as ::std::convert::AsRef<str>>::as_ref(other)
1401 ))),
1402 }
1403 }};
1404 }
1405 match promotion {
1406 Promotion::Direct => self.decode(buf),
1407 Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1408 Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1409 Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1410 Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1411 Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1412 Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1413 Promotion::StringToBytes => match self {
1414 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1415 let data = buf.get_bytes()?;
1416 offsets.push_length(data.len());
1417 values.extend_from_slice(data);
1418 Ok(())
1419 }
1420 other => Err(AvroError::ParseError(format!(
1421 "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1422 <Self as AsRef<str>>::as_ref(other)
1423 ))),
1424 },
1425 Promotion::BytesToString => match self {
1426 Self::String(offsets, values)
1427 | Self::StringView(offsets, values)
1428 | Self::BytesToString(offsets, values) => {
1429 let data = buf.get_bytes()?;
1430 offsets.push_length(data.len());
1431 values.extend_from_slice(data);
1432 Ok(())
1433 }
1434 other => Err(AvroError::ParseError(format!(
1435 "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1436 <Self as AsRef<str>>::as_ref(other)
1437 ))),
1438 },
1439 }
1440 }
1441
1442 fn decode_with_resolution<'d>(
1443 &'d mut self,
1444 buf: &mut AvroCursor<'_>,
1445 resolution: &'d ResolutionPlan,
1446 ) -> Result<(), AvroError> {
1447 #[cfg(feature = "avro_custom_types")]
1448 if let Self::RunEndEncoded(_, len, inner) = self {
1449 *len += 1;
1450 return inner.decode_with_resolution(buf, resolution);
1451 }
1452
1453 match resolution {
1454 ResolutionPlan::Promotion(promotion) => {
1455 let promotion = *promotion;
1456 self.decode_with_promotion(buf, promotion)
1457 }
1458 ResolutionPlan::DefaultValue(lit) => self.append_default(lit),
1459 ResolutionPlan::EnumMapping(res) => {
1460 let Self::Enum(indices, _, _) = self else {
1461 return Err(AvroError::SchemaError(
1462 "enum mapping resolution provided for non-enum decoder".into(),
1463 ));
1464 };
1465 let raw = buf.get_int()?;
1466 let resolved = res.resolve(raw)?;
1467 indices.push(resolved);
1468 Ok(())
1469 }
1470 ResolutionPlan::Record(proj) => {
1471 let Self::Record(_, encodings, _, _) = self else {
1472 return Err(AvroError::SchemaError(
1473 "record projection provided for non-record decoder".into(),
1474 ));
1475 };
1476 proj.project_record(buf, encodings)
1477 }
1478 }
1479 }
1480
1481 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1483 Ok(match self {
1484 Self::Nullable(_, n, e) => e.flush(n.finish())?,
1485 Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1486 Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1487 Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1488 Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1489 Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1490 Self::TimeMillis(values) => {
1491 Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1492 }
1493 Self::TimeMicros(values) => {
1494 Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1495 }
1496 Self::TimestampMillis(tz, values) => Arc::new(
1497 flush_primitive::<TimestampMillisecondType>(values, nulls)
1498 .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1499 ),
1500 Self::TimestampMicros(tz, values) => Arc::new(
1501 flush_primitive::<TimestampMicrosecondType>(values, nulls)
1502 .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1503 ),
1504 Self::TimestampNanos(tz, values) => Arc::new(
1505 flush_primitive::<TimestampNanosecondType>(values, nulls)
1506 .with_timezone_opt(tz.as_ref().map(|tz| tz.to_string())),
1507 ),
1508 #[cfg(feature = "avro_custom_types")]
1509 Self::DurationSecond(values) => {
1510 Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1511 }
1512 #[cfg(feature = "avro_custom_types")]
1513 Self::DurationMillisecond(values) => {
1514 Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1515 }
1516 #[cfg(feature = "avro_custom_types")]
1517 Self::DurationMicrosecond(values) => {
1518 Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1519 }
1520 #[cfg(feature = "avro_custom_types")]
1521 Self::DurationNanosecond(values) => {
1522 Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1523 }
1524 #[cfg(feature = "avro_custom_types")]
1525 Self::Int8(values) => Arc::new(flush_primitive::<Int8Type>(values, nulls)),
1526 #[cfg(feature = "avro_custom_types")]
1527 Self::Int16(values) => Arc::new(flush_primitive::<Int16Type>(values, nulls)),
1528 #[cfg(feature = "avro_custom_types")]
1529 Self::UInt8(values) => Arc::new(flush_primitive::<UInt8Type>(values, nulls)),
1530 #[cfg(feature = "avro_custom_types")]
1531 Self::UInt16(values) => Arc::new(flush_primitive::<UInt16Type>(values, nulls)),
1532 #[cfg(feature = "avro_custom_types")]
1533 Self::UInt32(values) => Arc::new(flush_primitive::<UInt32Type>(values, nulls)),
1534 #[cfg(feature = "avro_custom_types")]
1535 Self::UInt64(values) => Arc::new(flush_primitive::<UInt64Type>(values, nulls)),
1536 #[cfg(feature = "avro_custom_types")]
1537 Self::Float16(values) => {
1538 let len = values.len();
1541 let buf: Buffer = std::mem::take(values).into();
1542 let scalar_buf = ScalarBuffer::new(buf, 0, len);
1543 Arc::new(Float16Array::new(scalar_buf, nulls))
1544 }
1545 #[cfg(feature = "avro_custom_types")]
1546 Self::Date64(values) => Arc::new(flush_primitive::<Date64Type>(values, nulls)),
1547 #[cfg(feature = "avro_custom_types")]
1548 Self::TimeNanos(values) => {
1549 Arc::new(flush_primitive::<Time64NanosecondType>(values, nulls))
1550 }
1551 #[cfg(feature = "avro_custom_types")]
1552 Self::Time32Secs(values) => {
1553 Arc::new(flush_primitive::<Time32SecondType>(values, nulls))
1554 }
1555 #[cfg(feature = "avro_custom_types")]
1556 Self::TimestampSecs(is_utc, values) => Arc::new(
1557 flush_primitive::<TimestampSecondType>(values, nulls)
1558 .with_timezone_opt(is_utc.then(|| "+00:00")),
1559 ),
1560 #[cfg(feature = "avro_custom_types")]
1561 Self::IntervalYearMonth(values) => {
1562 Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
1563 }
1564 #[cfg(feature = "avro_custom_types")]
1565 Self::IntervalMonthDayNano(values) => {
1566 Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1567 }
1568 #[cfg(feature = "avro_custom_types")]
1569 Self::IntervalDayTime(values) => {
1570 Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
1571 }
1572 Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1573 Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1574 Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1575 Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1576 Arc::new(flush_primitive::<Float32Type>(values, nulls))
1577 }
1578 Self::Int32ToFloat64(values)
1579 | Self::Int64ToFloat64(values)
1580 | Self::Float32ToFloat64(values) => {
1581 Arc::new(flush_primitive::<Float64Type>(values, nulls))
1582 }
1583 Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1584 let offsets = flush_offsets(offsets);
1585 let values = flush_values(values).into();
1586 Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1587 }
1588 Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1589 let offsets = flush_offsets(offsets);
1590 let values = flush_values(values).into();
1591 Arc::new(StringArray::try_new(offsets, values, nulls)?)
1592 }
1593 Self::StringView(offsets, values) => {
1594 let offsets = flush_offsets(offsets);
1595 let values = flush_values(values);
1596 let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1597 let values: Vec<&str> = (0..array.len())
1598 .map(|i| {
1599 if array.is_valid(i) {
1600 array.value(i)
1601 } else {
1602 ""
1603 }
1604 })
1605 .collect();
1606 Arc::new(StringViewArray::from(values))
1607 }
1608 Self::Array(field, offsets, values) => {
1609 let values = values.flush(None)?;
1610 let offsets = flush_offsets(offsets);
1611 Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1612 }
1613 Self::Record(fields, encodings, _, _) => {
1614 let arrays = encodings
1615 .iter_mut()
1616 .map(|x| x.flush(None))
1617 .collect::<Result<Vec<_>, _>>()?;
1618 Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1619 }
1620 Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1621 let moff = flush_offsets(m_off);
1622 let koff = flush_offsets(k_off);
1623 let kd = flush_values(kdata).into();
1624 let val_arr = valdec.flush(None)?;
1625 let key_arr = StringArray::try_new(koff, kd, None)?;
1626 if key_arr.len() != val_arr.len() {
1627 return Err(AvroError::InvalidArgument(format!(
1628 "Map keys length ({}) != map values length ({})",
1629 key_arr.len(),
1630 val_arr.len()
1631 )));
1632 }
1633 let final_len = moff.len() - 1;
1634 if let Some(n) = &nulls {
1635 if n.len() != final_len {
1636 return Err(AvroError::InvalidArgument(format!(
1637 "Map array null buffer length {} != final map length {final_len}",
1638 n.len()
1639 )));
1640 }
1641 }
1642 let entries_fields = match map_field.data_type() {
1643 DataType::Struct(fields) => fields.clone(),
1644 other => {
1645 return Err(AvroError::InvalidArgument(format!(
1646 "Map entries field must be a Struct, got {other:?}"
1647 )));
1648 }
1649 };
1650 let entries_struct =
1651 StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1652 let map_arr =
1653 MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1654 Arc::new(map_arr)
1655 }
1656 Self::Fixed(sz, accum) => {
1657 let b: Buffer = flush_values(accum).into();
1658 let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1659 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1660 Arc::new(arr)
1661 }
1662 Self::Uuid(values) => {
1663 let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1664 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1665 Arc::new(arr)
1666 }
1667 #[cfg(feature = "small_decimals")]
1668 Self::Decimal32(precision, scale, _, builder) => {
1669 flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1670 }
1671 #[cfg(feature = "small_decimals")]
1672 Self::Decimal64(precision, scale, _, builder) => {
1673 flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1674 }
1675 Self::Decimal128(precision, scale, _, builder) => {
1676 flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1677 }
1678 Self::Decimal256(precision, scale, _, builder) => {
1679 flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1680 }
1681 Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1682 Self::Duration(builder) => {
1683 let (_, vals, _) = builder.finish().into_parts();
1684 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1685 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1686 Arc::new(vals)
1687 }
1688 #[cfg(feature = "avro_custom_types")]
1689 Self::RunEndEncoded(width, len, inner) => {
1690 let values = inner.flush(nulls)?;
1691 let n = *len;
1692 let arr = values.as_ref();
1693 let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1694 if n > 0 {
1695 run_starts.push(0);
1696 for i in 1..n {
1697 if !values_equal_at(arr, i - 1, i) {
1698 run_starts.push(i);
1699 }
1700 }
1701 }
1702 if n > (u32::MAX as usize) {
1703 return Err(AvroError::InvalidArgument(format!(
1704 "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1705 )));
1706 }
1707 let run_count = run_starts.len();
1708 let take_idx: PrimitiveArray<UInt32Type> =
1709 run_starts.iter().map(|&s| s as u32).collect();
1710 let per_run_values = if run_count == 0 {
1711 values.slice(0, 0)
1712 } else {
1713 take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1714 AvroError::ParseError(format!("take() for REE values failed: {e}"))
1715 })?
1716 };
1717
1718 macro_rules! build_run_array {
1719 ($Native:ty, $ArrowTy:ty) => {{
1720 let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1721 for (idx, &_start) in run_starts.iter().enumerate() {
1722 let end = if idx + 1 < run_count {
1723 run_starts[idx + 1]
1724 } else {
1725 n
1726 };
1727 ends.push(end as $Native);
1728 }
1729 let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1730 let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1731 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1732 Arc::new(run_arr) as ArrayRef
1733 }};
1734 }
1735 match *width {
1736 2 => {
1737 if n > i16::MAX as usize {
1738 return Err(AvroError::InvalidArgument(format!(
1739 "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1740 )));
1741 }
1742 build_run_array!(i16, Int16Type)
1743 }
1744 4 => build_run_array!(i32, Int32Type),
1745 8 => build_run_array!(i64, Int64Type),
1746 other => {
1747 return Err(AvroError::InvalidArgument(format!(
1748 "Unsupported run-end width {other} for RunEndEncoded"
1749 )));
1750 }
1751 }
1752 }
1753 Self::Union(u) => u.flush(nulls)?,
1754 })
1755 }
1756}
1757
1758#[derive(Debug)]
1760enum NullablePlan {
1761 ReadTag {
1763 nullability: Nullability,
1764 resolution: ResolutionPlan,
1765 },
1766 FromSingle { resolution: ResolutionPlan },
1769}
1770
1771#[derive(Debug)]
1773enum ResolutionPlan {
1774 Promotion(Promotion),
1776 DefaultValue(AvroLiteral),
1778 EnumMapping(EnumResolution),
1780 Record(Projector),
1782}
1783
1784impl ResolutionPlan {
1785 fn try_new(decoder: &Decoder, resolution: &ResolutionInfo) -> Result<Self, AvroError> {
1786 match (decoder, resolution) {
1787 (_, ResolutionInfo::Promotion(p)) => Ok(ResolutionPlan::Promotion(*p)),
1788 (_, ResolutionInfo::DefaultValue(lit)) => Ok(ResolutionPlan::DefaultValue(lit.clone())),
1789 (_, ResolutionInfo::EnumMapping(m)) => {
1790 Ok(ResolutionPlan::EnumMapping(EnumResolution::new(m)))
1791 }
1792 (Decoder::Record(_, _, field_defaults, _), ResolutionInfo::Record(r)) => Ok(
1793 ResolutionPlan::Record(ProjectorBuilder::try_new(r, field_defaults).build()?),
1794 ),
1795 (_, ResolutionInfo::Record(_)) => Err(AvroError::SchemaError(
1796 "record resolution on non-record decoder".into(),
1797 )),
1798 (_, ResolutionInfo::Union(_)) => Err(AvroError::SchemaError(
1799 "union variant cannot be resolved to a union type".into(),
1800 )),
1801 }
1802 }
1803}
1804
1805#[derive(Debug)]
1806struct EnumResolution {
1807 mapping: Arc<[i32]>,
1808 default_index: i32,
1809}
1810
1811impl EnumResolution {
1812 fn new(mapping: &EnumMapping) -> Self {
1813 EnumResolution {
1814 mapping: mapping.mapping.clone(),
1815 default_index: mapping.default_index,
1816 }
1817 }
1818
1819 fn resolve(&self, index: i32) -> Result<i32, AvroError> {
1820 let resolved = usize::try_from(index)
1821 .ok()
1822 .and_then(|idx| self.mapping.get(idx).copied())
1823 .filter(|&idx| idx >= 0)
1824 .unwrap_or(self.default_index);
1825 if resolved >= 0 {
1826 Ok(resolved)
1827 } else {
1828 Err(AvroError::ParseError(format!(
1829 "Enum symbol index {index} not resolvable and no default provided",
1830 )))
1831 }
1832 }
1833}
1834
1835#[derive(Debug)]
1837struct DispatchLookupTable {
1838 to_reader: Box<[i8]>,
1854 resolution: Box<[ResolutionPlan]>,
1859}
1860
1861const NO_SOURCE: i8 = -1;
1864
1865impl DispatchLookupTable {
1866 fn from_writer_to_reader(
1867 reader_branches: &[Decoder],
1868 resolution_map: &[Option<(usize, ResolutionInfo)>],
1869 ) -> Result<Self, AvroError> {
1870 let mut to_reader = Vec::with_capacity(resolution_map.len());
1871 let mut resolution = Vec::with_capacity(resolution_map.len());
1872 for map in resolution_map {
1873 match map {
1874 Some((idx, res)) => {
1875 let idx = *idx;
1876 let idx_i8 = i8::try_from(idx).map_err(|_| {
1877 AvroError::SchemaError(format!(
1878 "Reader branch index {idx} exceeds i8 range (max {})",
1879 i8::MAX
1880 ))
1881 })?;
1882 let plan = ResolutionPlan::try_new(&reader_branches[idx], res)?;
1883 to_reader.push(idx_i8);
1884 resolution.push(plan);
1885 }
1886 None => {
1887 to_reader.push(NO_SOURCE);
1888 resolution.push(ResolutionPlan::DefaultValue(AvroLiteral::Null));
1889 }
1890 }
1891 }
1892 Ok(Self {
1893 to_reader: to_reader.into_boxed_slice(),
1894 resolution: resolution.into_boxed_slice(),
1895 })
1896 }
1897
1898 #[inline]
1900 fn resolve(&self, writer_index: usize) -> Option<(usize, &ResolutionPlan)> {
1901 let reader_index = *self.to_reader.get(writer_index)?;
1902 (reader_index >= 0).then(|| (reader_index as usize, &self.resolution[writer_index]))
1903 }
1904}
1905
1906#[derive(Debug)]
1907struct UnionDecoder {
1908 fields: UnionFields,
1909 branches: UnionDecoderBranches,
1910 default_emit_idx: usize,
1911 null_emit_idx: usize,
1912 plan: UnionReadPlan,
1913}
1914
1915#[derive(Debug, Default)]
1916struct UnionDecoderBranches {
1917 decoders: Vec<Decoder>,
1918 reader_type_codes: Vec<i8>,
1919 type_ids: Vec<i8>,
1920 offsets: Vec<i32>,
1921 counts: Vec<i32>,
1922}
1923
1924impl UnionDecoderBranches {
1925 fn new(decoders: Vec<Decoder>, reader_type_codes: Vec<i8>) -> Self {
1926 let branch_len = decoders.len().max(reader_type_codes.len());
1927 Self {
1928 decoders,
1929 reader_type_codes,
1930 type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1931 offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1932 counts: vec![0; branch_len],
1933 }
1934 }
1935
1936 fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1937 let branches_len = self.decoders.len();
1938 let Some(reader_branch) = self.decoders.get_mut(reader_idx) else {
1939 return Err(AvroError::ParseError(format!(
1940 "Union branch index {reader_idx} out of range ({branches_len} branches)"
1941 )));
1942 };
1943 self.type_ids.push(self.reader_type_codes[reader_idx]);
1944 self.offsets.push(self.counts[reader_idx]);
1945 self.counts[reader_idx] += 1;
1946 Ok(reader_branch)
1947 }
1948}
1949
1950impl Default for UnionDecoder {
1951 fn default() -> Self {
1952 Self {
1953 fields: UnionFields::empty(),
1954 branches: Default::default(),
1955 default_emit_idx: 0,
1956 null_emit_idx: 0,
1957 plan: UnionReadPlan::Passthrough,
1958 }
1959 }
1960}
1961
1962#[derive(Debug)]
1963enum UnionReadPlan {
1964 ReaderUnion {
1965 lookup_table: DispatchLookupTable,
1966 },
1967 FromSingle {
1968 reader_idx: usize,
1969 resolution: ResolutionPlan,
1970 },
1971 ToSingle {
1972 target: Box<Decoder>,
1973 lookup_table: DispatchLookupTable,
1974 },
1975 Passthrough,
1976}
1977
1978impl UnionReadPlan {
1979 fn from_resolved(
1980 reader_branches: &[Decoder],
1981 resolved: Option<ResolvedUnion>,
1982 ) -> Result<Self, AvroError> {
1983 let Some(info) = resolved else {
1984 return Ok(Self::Passthrough);
1985 };
1986 match (info.writer_is_union, info.reader_is_union) {
1987 (true, true) => {
1988 let lookup_table =
1989 DispatchLookupTable::from_writer_to_reader(reader_branches, &info.writer_to_reader)?;
1990 Ok(Self::ReaderUnion { lookup_table })
1991 }
1992 (false, true) => {
1993 let Some((idx, resolution)) =
1994 info.writer_to_reader.first().and_then(Option::as_ref)
1995 else {
1996 return Err(AvroError::SchemaError(
1997 "Writer type does not match any reader union branch".to_string(),
1998 ));
1999 };
2000 let reader_idx = *idx;
2001 Ok(Self::FromSingle {
2002 reader_idx,
2003 resolution: ResolutionPlan::try_new(&reader_branches[reader_idx], resolution)?,
2004 })
2005 }
2006 (true, false) => Err(AvroError::InvalidArgument(
2007 "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
2008 .to_string(),
2009 )),
2010 _ => Err(AvroError::SchemaError(
2012 "ResolvedUnion constructed for non-union sides; resolver should return None"
2013 .to_string(),
2014 )),
2015 }
2016 }
2017}
2018
2019impl UnionDecoder {
2020 fn try_new(
2021 fields: UnionFields,
2022 branches: Vec<Decoder>,
2023 resolved: Option<ResolvedUnion>,
2024 ) -> Result<Self, AvroError> {
2025 let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
2026 let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
2027 let default_emit_idx = 0;
2028 let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
2029 let max_addr = (i32::MAX as usize) + 1;
2031 if branches.len() > max_addr {
2032 return Err(AvroError::SchemaError(format!(
2033 "Reader union has {} branches, which exceeds the maximum addressable \
2034 branches by an Avro int tag ({} + 1).",
2035 branches.len(),
2036 i32::MAX
2037 )));
2038 }
2039 let plan = UnionReadPlan::from_resolved(&branches, resolved)?;
2040 Ok(Self {
2041 fields,
2042 branches: UnionDecoderBranches::new(branches, reader_type_codes),
2043 default_emit_idx,
2044 null_emit_idx,
2045 plan,
2046 })
2047 }
2048
2049 fn with_single_target(target: Decoder, info: ResolvedUnion) -> Result<Self, AvroError> {
2050 debug_assert!(info.writer_is_union && !info.reader_is_union);
2052 let mut reader_branches = [target];
2053 let lookup_table =
2054 DispatchLookupTable::from_writer_to_reader(&reader_branches, &info.writer_to_reader)?;
2055 let target = Box::new(mem::replace(&mut reader_branches[0], Decoder::Null(0)));
2056 Ok(Self {
2057 plan: UnionReadPlan::ToSingle {
2058 target,
2059 lookup_table,
2060 },
2061 ..Self::default()
2062 })
2063 }
2064
2065 #[inline]
2066 fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
2067 let raw = buf.get_long()?;
2072 if raw < 0 {
2073 return Err(AvroError::ParseError(format!(
2074 "Negative union branch index {raw}"
2075 )));
2076 }
2077 usize::try_from(raw).map_err(|_| {
2078 AvroError::ParseError(format!(
2079 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2080 (usize::BITS as usize)
2081 ))
2082 })
2083 }
2084
2085 #[inline]
2086 fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
2087 where
2088 F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
2089 {
2090 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2091 return action(target);
2092 }
2093 let reader_idx = match &self.plan {
2094 UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
2095 _ => fallback_idx,
2096 };
2097 self.branches.emit_to(reader_idx).and_then(action)
2098 }
2099
2100 fn append_null(&mut self) -> Result<(), AvroError> {
2101 self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
2102 }
2103
2104 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
2105 self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
2106 }
2107
2108 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2109 match &mut self.plan {
2110 UnionReadPlan::Passthrough => {
2111 let reader_idx = Self::read_tag(buf)?;
2112 let decoder = self.branches.emit_to(reader_idx)?;
2113 decoder.decode(buf)
2114 }
2115 UnionReadPlan::ReaderUnion { lookup_table } => {
2116 let idx = Self::read_tag(buf)?;
2117 let Some((reader_idx, resolution)) = lookup_table.resolve(idx) else {
2118 return Err(AvroError::ParseError(format!(
2119 "Union branch index {idx} not resolvable by reader schema"
2120 )));
2121 };
2122 let decoder = self.branches.emit_to(reader_idx)?;
2123 decoder.decode_with_resolution(buf, resolution)
2124 }
2125 UnionReadPlan::FromSingle {
2126 reader_idx,
2127 resolution,
2128 } => {
2129 let decoder = self.branches.emit_to(*reader_idx)?;
2130 decoder.decode_with_resolution(buf, resolution)
2131 }
2132 UnionReadPlan::ToSingle {
2133 target,
2134 lookup_table,
2135 } => {
2136 let idx = Self::read_tag(buf)?;
2137 let Some((_, resolution)) = lookup_table.resolve(idx) else {
2138 return Err(AvroError::ParseError(format!(
2139 "Writer union branch index {idx} not resolvable by reader schema"
2140 )));
2141 };
2142 target.decode_with_resolution(buf, resolution)
2143 }
2144 }
2145 }
2146
2147 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
2148 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
2149 return target.flush(nulls);
2150 }
2151 debug_assert!(
2152 nulls.is_none(),
2153 "UnionArray does not accept a validity bitmap; \
2154 nulls should have been materialized as a Null child during decode"
2155 );
2156 let children = self
2157 .branches
2158 .decoders
2159 .iter_mut()
2160 .map(|d| d.flush(None))
2161 .collect::<Result<Vec<_>, _>>()?;
2162 let arr = UnionArray::try_new(
2163 self.fields.clone(),
2164 flush_values(&mut self.branches.type_ids)
2165 .into_iter()
2166 .collect(),
2167 Some(
2168 flush_values(&mut self.branches.offsets)
2169 .into_iter()
2170 .collect(),
2171 ),
2172 children,
2173 )
2174 .map_err(|e| AvroError::ParseError(e.to_string()))?;
2175 Ok(Arc::new(arr))
2176 }
2177}
2178
2179#[derive(Debug, Default)]
2180struct UnionDecoderBuilder {
2181 fields: Option<UnionFields>,
2182 branches: Option<Vec<Decoder>>,
2183 resolved: Option<ResolvedUnion>,
2184 target: Option<Decoder>,
2185}
2186
2187impl UnionDecoderBuilder {
2188 fn new() -> Self {
2189 Self::default()
2190 }
2191
2192 fn with_fields(mut self, fields: UnionFields) -> Self {
2193 self.fields = Some(fields);
2194 self
2195 }
2196
2197 fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
2198 self.branches = Some(branches);
2199 self
2200 }
2201
2202 fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
2203 self.resolved = Some(resolved_union);
2204 self
2205 }
2206
2207 fn with_target(mut self, target: Decoder) -> Self {
2208 self.target = Some(target);
2209 self
2210 }
2211
2212 fn build(self) -> Result<UnionDecoder, AvroError> {
2213 match (self.resolved, self.fields, self.branches, self.target) {
2214 (resolved, Some(fields), Some(branches), None) => {
2215 UnionDecoder::try_new(fields, branches, resolved)
2216 }
2217 (Some(info), None, None, Some(target))
2218 if info.writer_is_union && !info.reader_is_union =>
2219 {
2220 UnionDecoder::with_single_target(target, info)
2221 }
2222 _ => Err(AvroError::InvalidArgument(
2223 "Invalid UnionDecoderBuilder configuration: expected either \
2224 (fields + branches + resolved) with no target for reader-unions, or \
2225 (resolved + target) with no fields/branches for writer-union to single."
2226 .to_string(),
2227 )),
2228 }
2229 }
2230}
2231
2232#[derive(Debug, Copy, Clone)]
2233enum NegativeBlockBehavior {
2234 ProcessItems,
2235 SkipBySize,
2236}
2237
2238#[inline]
2239fn skip_blocks(
2240 buf: &mut AvroCursor,
2241 mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2242) -> Result<usize, AvroError> {
2243 process_blockwise(
2244 buf,
2245 move |c| skip_item(c),
2246 NegativeBlockBehavior::SkipBySize,
2247 )
2248}
2249
2250#[inline]
2251fn flush_dict(
2252 indices: &mut Vec<i32>,
2253 symbols: &[String],
2254 nulls: Option<NullBuffer>,
2255) -> Result<ArrayRef, AvroError> {
2256 let keys = flush_primitive::<Int32Type>(indices, nulls);
2257 let values = Arc::new(StringArray::from_iter_values(
2258 symbols.iter().map(|s| s.as_str()),
2259 ));
2260 DictionaryArray::try_new(keys, values)
2261 .map_err(Into::into)
2262 .map(|arr| Arc::new(arr) as ArrayRef)
2263}
2264
2265#[inline]
2266fn read_blocks(
2267 buf: &mut AvroCursor,
2268 decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2269) -> Result<usize, AvroError> {
2270 process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
2271}
2272
2273#[inline]
2274fn process_blockwise(
2275 buf: &mut AvroCursor,
2276 mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2277 negative_behavior: NegativeBlockBehavior,
2278) -> Result<usize, AvroError> {
2279 let mut total = 0usize;
2280 loop {
2281 let block_count = buf.get_long()?;
2286 match block_count.cmp(&0) {
2287 Ordering::Equal => break,
2288 Ordering::Less => {
2289 let count = (-block_count) as usize;
2290 let size_in_bytes = buf.get_long()? as usize;
2292 match negative_behavior {
2293 NegativeBlockBehavior::ProcessItems => {
2294 for _ in 0..count {
2296 on_item(buf)?;
2297 }
2298 }
2299 NegativeBlockBehavior::SkipBySize => {
2300 let _ = buf.get_fixed(size_in_bytes)?;
2302 }
2303 }
2304 total += count;
2305 }
2306 Ordering::Greater => {
2307 let count = block_count as usize;
2308 for _ in 0..count {
2309 on_item(buf)?;
2310 }
2311 total += count;
2312 }
2313 }
2314 }
2315 Ok(total)
2316}
2317
2318#[inline]
2319fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
2320 std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
2321}
2322
2323#[inline]
2324fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
2325 std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
2326}
2327
2328#[inline]
2329fn flush_primitive<T: ArrowPrimitiveType>(
2330 values: &mut Vec<T::Native>,
2331 nulls: Option<NullBuffer>,
2332) -> PrimitiveArray<T> {
2333 PrimitiveArray::new(flush_values(values).into(), nulls)
2334}
2335
2336#[inline]
2337fn read_decimal_bytes_be<const N: usize>(
2338 buf: &mut AvroCursor<'_>,
2339 size: &Option<usize>,
2340) -> Result<[u8; N], AvroError> {
2341 match size {
2342 Some(n) if *n == N => {
2343 let raw = buf.get_fixed(N)?;
2344 let mut arr = [0u8; N];
2345 arr.copy_from_slice(raw);
2346 Ok(arr)
2347 }
2348 Some(n) => {
2349 let raw = buf.get_fixed(*n)?;
2350 sign_cast_to::<N>(raw)
2351 }
2352 None => {
2353 let raw = buf.get_bytes()?;
2354 sign_cast_to::<N>(raw)
2355 }
2356 }
2357}
2358
2359#[inline]
2368fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
2369 let len = raw.len();
2370 if len == N {
2372 let mut out = [0u8; N];
2373 out.copy_from_slice(raw);
2374 return Ok(out);
2375 }
2376 let first = raw.first().copied().unwrap_or(0u8);
2378 let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
2379 let mut out = [sign_byte; N];
2381 if len > N {
2382 let extra = len - N;
2385 if raw[..extra].iter().any(|&b| b != sign_byte) {
2387 return Err(AvroError::ParseError(format!(
2388 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2389 len, N
2390 )));
2391 }
2392 if N > 0 {
2393 let first_kept = raw[extra];
2394 let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
2395 if sign_bit_mismatch {
2396 return Err(AvroError::ParseError(format!(
2397 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2398 len, N
2399 )));
2400 }
2401 }
2402 out.copy_from_slice(&raw[extra..]);
2403 return Ok(out);
2404 }
2405 out[N - len..].copy_from_slice(raw);
2406 Ok(out)
2407}
2408
2409#[cfg(feature = "avro_custom_types")]
2410#[inline]
2411fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
2412 match (arr.is_null(i), arr.is_null(j)) {
2413 (true, true) => true,
2414 (true, false) | (false, true) => false,
2415 (false, false) => {
2416 let a = arr.slice(i, 1);
2417 let b = arr.slice(j, 1);
2418 a == b
2419 }
2420 }
2421}
2422
2423#[derive(Debug)]
2424struct Projector {
2425 writer_projections: Vec<FieldProjection>,
2426 default_injections: Arc<[(usize, AvroLiteral)]>,
2427}
2428
2429#[derive(Debug)]
2430enum FieldProjection {
2431 ToReader(usize),
2432 Skip(Skipper),
2433}
2434
2435#[derive(Debug)]
2436struct ProjectorBuilder<'a> {
2437 rec: &'a ResolvedRecord,
2438 field_defaults: &'a [Option<AvroLiteral>],
2439}
2440
2441impl<'a> ProjectorBuilder<'a> {
2442 #[inline]
2443 fn try_new(rec: &'a ResolvedRecord, field_defaults: &'a [Option<AvroLiteral>]) -> Self {
2444 Self {
2445 rec,
2446 field_defaults,
2447 }
2448 }
2449
2450 #[inline]
2451 fn build(self) -> Result<Projector, AvroError> {
2452 let mut default_injections: Vec<(usize, AvroLiteral)> =
2453 Vec::with_capacity(self.rec.default_fields.len());
2454 for &idx in self.rec.default_fields.as_ref() {
2455 let lit = self
2456 .field_defaults
2457 .get(idx)
2458 .and_then(|lit| lit.clone())
2459 .unwrap_or(AvroLiteral::Null);
2460 default_injections.push((idx, lit));
2461 }
2462 let writer_projections = self
2463 .rec
2464 .writer_fields
2465 .iter()
2466 .map(|field| match field {
2467 ResolvedField::ToReader(index, _) => Ok(FieldProjection::ToReader(*index)),
2468 ResolvedField::Skip(datatype) => {
2469 let skipper = Skipper::from_avro(datatype)?;
2470 Ok(FieldProjection::Skip(skipper))
2471 }
2472 })
2473 .collect::<Result<_, AvroError>>()?;
2474 Ok(Projector {
2475 writer_projections,
2476 default_injections: default_injections.into(),
2477 })
2478 }
2479}
2480
2481impl Projector {
2482 #[inline]
2483 fn project_record(
2484 &self,
2485 buf: &mut AvroCursor<'_>,
2486 encodings: &mut [Decoder],
2487 ) -> Result<(), AvroError> {
2488 for field_proj in self.writer_projections.iter() {
2489 match field_proj {
2490 FieldProjection::ToReader(index) => encodings[*index].decode(buf)?,
2491 FieldProjection::Skip(skipper) => skipper.skip(buf)?,
2492 }
2493 }
2494 for (reader_index, lit) in self.default_injections.as_ref() {
2495 encodings[*reader_index].append_default(lit)?;
2496 }
2497 Ok(())
2498 }
2499}
2500
2501#[derive(Debug)]
2507enum Skipper {
2508 Null,
2509 Boolean,
2510 Int32,
2511 Int64,
2512 Float32,
2513 Float64,
2514 Bytes,
2515 String,
2516 TimeMicros,
2517 TimestampMillis,
2518 TimestampMicros,
2519 TimestampNanos,
2520 Fixed(usize),
2521 Decimal(Option<usize>),
2522 UuidString,
2523 Enum,
2524 DurationFixed12,
2525 List(Box<Skipper>),
2526 Map(Box<Skipper>),
2527 Struct(Vec<Skipper>),
2528 Union(Vec<Skipper>),
2529 Nullable(Nullability, Box<Skipper>),
2530 #[cfg(feature = "avro_custom_types")]
2531 RunEndEncoded(Box<Skipper>),
2532}
2533
2534impl Skipper {
2535 fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
2536 let mut base = match dt.codec() {
2537 Codec::Null => Self::Null,
2538 Codec::Boolean => Self::Boolean,
2539 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
2540 Codec::Int64 => Self::Int64,
2541 Codec::TimeMicros => Self::TimeMicros,
2542 Codec::TimestampMillis(_) => Self::TimestampMillis,
2543 Codec::TimestampMicros(_) => Self::TimestampMicros,
2544 Codec::TimestampNanos(_) => Self::TimestampNanos,
2545 #[cfg(feature = "avro_custom_types")]
2546 Codec::DurationNanos
2547 | Codec::DurationMicros
2548 | Codec::DurationMillis
2549 | Codec::DurationSeconds => Self::Int64,
2550 #[cfg(feature = "avro_custom_types")]
2551 Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 | Codec::Time32Secs => {
2552 Self::Int32
2553 }
2554 #[cfg(feature = "avro_custom_types")]
2555 Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
2556 Self::Int64
2557 }
2558 #[cfg(feature = "avro_custom_types")]
2559 Codec::UInt64 => Self::Fixed(8),
2560 #[cfg(feature = "avro_custom_types")]
2561 Codec::Float16 => Self::Fixed(2),
2562 #[cfg(feature = "avro_custom_types")]
2563 Codec::IntervalYearMonth => Self::Fixed(4),
2564 #[cfg(feature = "avro_custom_types")]
2565 Codec::IntervalMonthDayNano => Self::Fixed(16),
2566 #[cfg(feature = "avro_custom_types")]
2567 Codec::IntervalDayTime => Self::Fixed(8),
2568 Codec::Float32 => Self::Float32,
2569 Codec::Float64 => Self::Float64,
2570 Codec::Binary => Self::Bytes,
2571 Codec::Utf8 | Codec::Utf8View => Self::String,
2572 Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2573 Codec::Decimal(_, _, size) => Self::Decimal(*size),
2574 Codec::Uuid => Self::UuidString, Codec::Enum(_) => Self::Enum,
2576 Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2577 Codec::Struct(fields) => {
2578 if let Some(ResolutionInfo::Record(rec)) = dt.resolution.as_ref() {
2579 Self::Struct(
2580 rec.writer_fields
2581 .iter()
2582 .map(|wf| match wf {
2583 ResolvedField::ToReader(_, wdt) | ResolvedField::Skip(wdt) => {
2584 Skipper::from_avro(wdt)
2585 }
2586 })
2587 .collect::<Result<_, _>>()?,
2588 )
2589 } else {
2590 Self::Struct(
2591 fields
2592 .iter()
2593 .map(|f| Skipper::from_avro(f.data_type()))
2594 .collect::<Result<_, _>>()?,
2595 )
2596 }
2597 }
2598 Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2599 Codec::Interval => Self::DurationFixed12,
2600 Codec::Union(encodings, _, _) => {
2601 let max_addr = (i32::MAX as usize) + 1;
2602 if encodings.len() > max_addr {
2603 return Err(AvroError::SchemaError(format!(
2604 "Writer union has {} branches, which exceeds the maximum addressable \
2605 branches by an Avro int tag ({} + 1).",
2606 encodings.len(),
2607 i32::MAX
2608 )));
2609 }
2610 Self::Union(
2611 encodings
2612 .iter()
2613 .map(Skipper::from_avro)
2614 .collect::<Result<_, _>>()?,
2615 )
2616 }
2617 #[cfg(feature = "avro_custom_types")]
2618 Codec::RunEndEncoded(inner, _w) => {
2619 Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2620 }
2621 };
2622 if let Some(n) = dt.nullability() {
2623 base = Self::Nullable(n, Box::new(base));
2624 }
2625 Ok(base)
2626 }
2627
2628 fn skip(&self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2629 match self {
2630 Self::Null => Ok(()),
2631 Self::Boolean => {
2632 buf.get_bool()?;
2633 Ok(())
2634 }
2635 Self::Int32 => {
2636 buf.skip_int()?;
2637 Ok(())
2638 }
2639 Self::Int64
2640 | Self::TimeMicros
2641 | Self::TimestampMillis
2642 | Self::TimestampMicros
2643 | Self::TimestampNanos => {
2644 buf.skip_long()?;
2645 Ok(())
2646 }
2647 Self::Float32 => {
2648 buf.get_float()?;
2649 Ok(())
2650 }
2651 Self::Float64 => {
2652 buf.get_double()?;
2653 Ok(())
2654 }
2655 Self::Bytes | Self::String | Self::UuidString => {
2656 buf.get_bytes()?;
2657 Ok(())
2658 }
2659 Self::Fixed(sz) => {
2660 buf.get_fixed(*sz)?;
2661 Ok(())
2662 }
2663 Self::Decimal(size) => {
2664 if let Some(s) = size {
2665 buf.get_fixed(*s)
2666 } else {
2667 buf.get_bytes()
2668 }?;
2669 Ok(())
2670 }
2671 Self::Enum => {
2672 buf.skip_int()?;
2673 Ok(())
2674 }
2675 Self::DurationFixed12 => {
2676 buf.get_fixed(12)?;
2677 Ok(())
2678 }
2679 Self::List(item) => {
2680 skip_blocks(buf, |c| item.skip(c))?;
2681 Ok(())
2682 }
2683 Self::Map(value) => {
2684 skip_blocks(buf, |c| {
2685 c.get_bytes()?; value.skip(c)
2687 })?;
2688 Ok(())
2689 }
2690 Self::Struct(fields) => {
2691 for f in fields.iter() {
2692 f.skip(buf)?
2693 }
2694 Ok(())
2695 }
2696 Self::Union(encodings) => {
2697 let raw = buf.get_long()?;
2699 if raw < 0 {
2700 return Err(AvroError::ParseError(format!(
2701 "Negative union branch index {raw}"
2702 )));
2703 }
2704 let idx: usize = usize::try_from(raw).map_err(|_| {
2705 AvroError::ParseError(format!(
2706 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2707 (usize::BITS as usize)
2708 ))
2709 })?;
2710 let Some(encoding) = encodings.get(idx) else {
2711 return Err(AvroError::ParseError(format!(
2712 "Union branch index {idx} out of range for skipper ({} branches)",
2713 encodings.len()
2714 )));
2715 };
2716 encoding.skip(buf)
2717 }
2718 Self::Nullable(order, inner) => {
2719 let branch = buf.read_vlq()?;
2720 let is_not_null = match *order {
2721 Nullability::NullFirst => branch != 0,
2722 Nullability::NullSecond => branch == 0,
2723 };
2724 if is_not_null {
2725 inner.skip(buf)?;
2726 }
2727 Ok(())
2728 }
2729 #[cfg(feature = "avro_custom_types")]
2730 Self::RunEndEncoded(inner) => inner.skip(buf),
2731 }
2732 }
2733}
2734
2735#[cfg(test)]
2736mod tests {
2737 use super::*;
2738 use crate::codec::AvroFieldBuilder;
2739 use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2740 use arrow_array::cast::AsArray;
2741 use indexmap::IndexMap;
2742 use std::collections::HashMap;
2743
2744 fn encode_avro_int(value: i32) -> Vec<u8> {
2745 let mut buf = Vec::new();
2746 let mut v = (value << 1) ^ (value >> 31);
2747 while v & !0x7F != 0 {
2748 buf.push(((v & 0x7F) | 0x80) as u8);
2749 v >>= 7;
2750 }
2751 buf.push(v as u8);
2752 buf
2753 }
2754
2755 fn encode_avro_long(value: i64) -> Vec<u8> {
2756 let mut buf = Vec::new();
2757 let mut v = (value << 1) ^ (value >> 63);
2758 while v & !0x7F != 0 {
2759 buf.push(((v & 0x7F) | 0x80) as u8);
2760 v >>= 7;
2761 }
2762 buf.push(v as u8);
2763 buf
2764 }
2765
2766 fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2767 let mut buf = encode_avro_long(bytes.len() as i64);
2768 buf.extend_from_slice(bytes);
2769 buf
2770 }
2771
2772 fn avro_from_codec(codec: Codec) -> AvroDataType {
2773 AvroDataType::new(codec, Default::default(), None)
2774 }
2775
2776 fn resolved_root_datatype(
2777 writer: Schema<'static>,
2778 reader: Schema<'static>,
2779 use_utf8view: bool,
2780 strict_mode: bool,
2781 ) -> AvroDataType {
2782 let writer_record = Schema::Complex(ComplexType::Record(Record {
2784 name: "Root",
2785 namespace: None,
2786 doc: None,
2787 aliases: vec![],
2788 fields: vec![Field {
2789 name: "v",
2790 r#type: writer,
2791 default: None,
2792 doc: None,
2793 aliases: vec![],
2794 }],
2795 attributes: Attributes::default(),
2796 }));
2797
2798 let reader_record = Schema::Complex(ComplexType::Record(Record {
2800 name: "Root",
2801 namespace: None,
2802 doc: None,
2803 aliases: vec![],
2804 fields: vec![Field {
2805 name: "v",
2806 r#type: reader,
2807 default: None,
2808 doc: None,
2809 aliases: vec![],
2810 }],
2811 attributes: Attributes::default(),
2812 }));
2813
2814 let field = AvroFieldBuilder::new(&writer_record)
2816 .with_reader_schema(&reader_record)
2817 .with_utf8view(use_utf8view)
2818 .with_strict_mode(strict_mode)
2819 .build()
2820 .expect("schema resolution should succeed");
2821
2822 match field.data_type().codec() {
2823 Codec::Struct(fields) => fields[0].data_type().clone(),
2824 other => panic!("expected wrapper struct, got {other:?}"),
2825 }
2826 }
2827
2828 fn decoder_for_promotion(
2829 writer: PrimitiveType,
2830 reader: PrimitiveType,
2831 use_utf8view: bool,
2832 ) -> Decoder {
2833 let ws = Schema::TypeName(TypeName::Primitive(writer));
2834 let rs = Schema::TypeName(TypeName::Primitive(reader));
2835 let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2836 Decoder::try_new(&dt).unwrap()
2837 }
2838
2839 fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2840 AvroDataType::new(codec, HashMap::new(), nullability)
2841 }
2842
2843 #[cfg(feature = "avro_custom_types")]
2844 fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2845 let mut out = Vec::with_capacity(10);
2846 while x >= 0x80 {
2847 out.push((x as u8) | 0x80);
2848 x >>= 7;
2849 }
2850 out.push(x as u8);
2851 out
2852 }
2853
2854 #[test]
2855 fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2856 let ws = Schema::Union(vec![
2857 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2858 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2859 ]);
2860 let rs = Schema::Union(vec![
2861 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2862 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2863 ]);
2864
2865 let dt = resolved_root_datatype(ws, rs, false, false);
2866 let mut dec = Decoder::try_new(&dt).unwrap();
2867
2868 let mut rec1 = encode_avro_long(0);
2869 rec1.extend(encode_avro_int(7));
2870 let mut cur1 = AvroCursor::new(&rec1);
2871 dec.decode(&mut cur1).unwrap();
2872
2873 let mut rec2 = encode_avro_long(1);
2874 rec2.extend(encode_avro_bytes("abc".as_bytes()));
2875 let mut cur2 = AvroCursor::new(&rec2);
2876 dec.decode(&mut cur2).unwrap();
2877
2878 let arr = dec.flush(None).unwrap();
2879 let ua = arr
2880 .as_any()
2881 .downcast_ref::<UnionArray>()
2882 .expect("dense union output");
2883
2884 assert_eq!(
2885 ua.type_id(0),
2886 1,
2887 "first value must select reader 'long' branch"
2888 );
2889 assert_eq!(ua.value_offset(0), 0);
2890
2891 assert_eq!(
2892 ua.type_id(1),
2893 0,
2894 "second value must select reader 'string' branch"
2895 );
2896 assert_eq!(ua.value_offset(1), 0);
2897
2898 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2899 assert_eq!(long_child.len(), 1);
2900 assert_eq!(long_child.value(0), 7);
2901
2902 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2903 assert_eq!(str_child.len(), 1);
2904 assert_eq!(str_child.value(0), "abc");
2905 }
2906
2907 #[test]
2908 fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2909 let ws = Schema::Union(vec![
2910 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2911 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2912 ]);
2913 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2914
2915 let dt = resolved_root_datatype(ws, rs, false, false);
2916 let mut dec = Decoder::try_new(&dt).unwrap();
2917
2918 let mut data = encode_avro_long(0);
2919 data.extend(encode_avro_int(5));
2920 let mut cur = AvroCursor::new(&data);
2921 dec.decode(&mut cur).unwrap();
2922
2923 let arr = dec.flush(None).unwrap();
2924 let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2925 assert_eq!(out.len(), 1);
2926 assert_eq!(out.value(0), 5);
2927 }
2928
2929 #[test]
2930 fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2931 let ws = Schema::Union(vec![
2932 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2933 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2934 ]);
2935 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2936
2937 let dt = resolved_root_datatype(ws, rs, false, false);
2938 let mut dec = Decoder::try_new(&dt).unwrap();
2939
2940 let mut data = encode_avro_long(1);
2941 data.extend(encode_avro_bytes("z".as_bytes()));
2942 let mut cur = AvroCursor::new(&data);
2943 let res = dec.decode(&mut cur);
2944 assert!(
2945 res.is_err(),
2946 "expected error when writer union branch does not resolve to reader non-union type"
2947 );
2948 }
2949
2950 #[test]
2951 fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2952 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2953 let rs = Schema::Union(vec![
2954 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2955 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2956 ]);
2957
2958 let dt = resolved_root_datatype(ws, rs, false, false);
2959 let mut dec = Decoder::try_new(&dt).unwrap();
2960
2961 let data = encode_avro_int(6);
2962 let mut cur = AvroCursor::new(&data);
2963 dec.decode(&mut cur).unwrap();
2964
2965 let arr = dec.flush(None).unwrap();
2966 let ua = arr
2967 .as_any()
2968 .downcast_ref::<UnionArray>()
2969 .expect("dense union output");
2970 assert_eq!(ua.len(), 1);
2971 assert_eq!(
2972 ua.type_id(0),
2973 1,
2974 "must resolve to reader 'long' branch (type_id 1)"
2975 );
2976 assert_eq!(ua.value_offset(0), 0);
2977
2978 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2979 assert_eq!(long_child.len(), 1);
2980 assert_eq!(long_child.value(0), 6);
2981
2982 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2983 assert_eq!(str_child.len(), 0, "string branch must be empty");
2984 }
2985
2986 #[test]
2987 fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2988 let ws = Schema::Union(vec![
2989 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2990 Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2991 ]);
2992 let rs = Schema::Union(vec![
2993 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2994 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2995 ]);
2996
2997 let dt = resolved_root_datatype(ws, rs, false, false);
2998 let mut dec = Decoder::try_new(&dt).unwrap();
2999
3000 let mut data = encode_avro_long(1);
3001 data.push(1);
3002 let mut cur = AvroCursor::new(&data);
3003 let res = dec.decode(&mut cur);
3004 assert!(
3005 res.is_err(),
3006 "expected error for unmapped writer 'boolean' branch"
3007 );
3008 }
3009
3010 #[test]
3011 fn test_schema_resolution_promotion_int_to_long() {
3012 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
3013 assert!(matches!(dec, Decoder::Int32ToInt64(_)));
3014 for v in [0, 1, -2, 123456] {
3015 let data = encode_avro_int(v);
3016 let mut cur = AvroCursor::new(&data);
3017 dec.decode(&mut cur).unwrap();
3018 }
3019 let arr = dec.flush(None).unwrap();
3020 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
3021 assert_eq!(a.value(0), 0);
3022 assert_eq!(a.value(1), 1);
3023 assert_eq!(a.value(2), -2);
3024 assert_eq!(a.value(3), 123456);
3025 }
3026
3027 #[test]
3028 fn test_schema_resolution_promotion_int_to_float() {
3029 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
3030 assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
3031 for v in [0, 42, -7] {
3032 let data = encode_avro_int(v);
3033 let mut cur = AvroCursor::new(&data);
3034 dec.decode(&mut cur).unwrap();
3035 }
3036 let arr = dec.flush(None).unwrap();
3037 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3038 assert_eq!(a.value(0), 0.0);
3039 assert_eq!(a.value(1), 42.0);
3040 assert_eq!(a.value(2), -7.0);
3041 }
3042
3043 #[test]
3044 fn test_schema_resolution_promotion_int_to_double() {
3045 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
3046 assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
3047 for v in [1, -1, 10_000] {
3048 let data = encode_avro_int(v);
3049 let mut cur = AvroCursor::new(&data);
3050 dec.decode(&mut cur).unwrap();
3051 }
3052 let arr = dec.flush(None).unwrap();
3053 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3054 assert_eq!(a.value(0), 1.0);
3055 assert_eq!(a.value(1), -1.0);
3056 assert_eq!(a.value(2), 10_000.0);
3057 }
3058
3059 #[test]
3060 fn test_schema_resolution_promotion_long_to_float() {
3061 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
3062 assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
3063 for v in [0_i64, 1_000_000_i64, -123_i64] {
3064 let data = encode_avro_long(v);
3065 let mut cur = AvroCursor::new(&data);
3066 dec.decode(&mut cur).unwrap();
3067 }
3068 let arr = dec.flush(None).unwrap();
3069 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
3070 assert_eq!(a.value(0), 0.0);
3071 assert_eq!(a.value(1), 1_000_000.0);
3072 assert_eq!(a.value(2), -123.0);
3073 }
3074
3075 #[test]
3076 fn test_schema_resolution_promotion_long_to_double() {
3077 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
3078 assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
3079 for v in [2_i64, -2_i64, 9_223_372_i64] {
3080 let data = encode_avro_long(v);
3081 let mut cur = AvroCursor::new(&data);
3082 dec.decode(&mut cur).unwrap();
3083 }
3084 let arr = dec.flush(None).unwrap();
3085 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3086 assert_eq!(a.value(0), 2.0);
3087 assert_eq!(a.value(1), -2.0);
3088 assert_eq!(a.value(2), 9_223_372.0);
3089 }
3090
3091 #[test]
3092 fn test_schema_resolution_promotion_float_to_double() {
3093 let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
3094 assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
3095 for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
3096 let data = v.to_le_bytes().to_vec();
3097 let mut cur = AvroCursor::new(&data);
3098 dec.decode(&mut cur).unwrap();
3099 }
3100 let arr = dec.flush(None).unwrap();
3101 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
3102 assert_eq!(a.value(0), 0.5_f64);
3103 assert_eq!(a.value(1), -3.25_f64);
3104 assert_eq!(a.value(2), 1.0e6_f64);
3105 }
3106
3107 #[test]
3108 fn test_schema_resolution_promotion_bytes_to_string_utf8() {
3109 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
3110 assert!(matches!(dec, Decoder::BytesToString(_, _)));
3111 for s in ["hello", "world", "héllo"] {
3112 let data = encode_avro_bytes(s.as_bytes());
3113 let mut cur = AvroCursor::new(&data);
3114 dec.decode(&mut cur).unwrap();
3115 }
3116 let arr = dec.flush(None).unwrap();
3117 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3118 assert_eq!(a.value(0), "hello");
3119 assert_eq!(a.value(1), "world");
3120 assert_eq!(a.value(2), "héllo");
3121 }
3122
3123 #[test]
3124 fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
3125 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
3126 assert!(matches!(dec, Decoder::BytesToString(_, _)));
3127 let data = encode_avro_bytes("abc".as_bytes());
3128 let mut cur = AvroCursor::new(&data);
3129 dec.decode(&mut cur).unwrap();
3130 let arr = dec.flush(None).unwrap();
3131 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
3132 assert_eq!(a.value(0), "abc");
3133 }
3134
3135 #[test]
3136 fn test_schema_resolution_promotion_string_to_bytes() {
3137 let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
3138 assert!(matches!(dec, Decoder::StringToBytes(_, _)));
3139 for s in ["", "abc", "data"] {
3140 let data = encode_avro_bytes(s.as_bytes());
3141 let mut cur = AvroCursor::new(&data);
3142 dec.decode(&mut cur).unwrap();
3143 }
3144 let arr = dec.flush(None).unwrap();
3145 let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3146 assert_eq!(a.value(0), b"");
3147 assert_eq!(a.value(1), b"abc");
3148 assert_eq!(a.value(2), "data".as_bytes());
3149 }
3150
3151 #[test]
3152 fn test_schema_resolution_no_promotion_passthrough_int() {
3153 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3154 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3155 let writer_record = Schema::Complex(ComplexType::Record(Record {
3157 name: "Root",
3158 namespace: None,
3159 doc: None,
3160 aliases: vec![],
3161 fields: vec![Field {
3162 name: "v",
3163 r#type: ws,
3164 default: None,
3165 doc: None,
3166 aliases: vec![],
3167 }],
3168 attributes: Attributes::default(),
3169 }));
3170 let reader_record = Schema::Complex(ComplexType::Record(Record {
3171 name: "Root",
3172 namespace: None,
3173 doc: None,
3174 aliases: vec![],
3175 fields: vec![Field {
3176 name: "v",
3177 r#type: rs,
3178 default: None,
3179 doc: None,
3180 aliases: vec![],
3181 }],
3182 attributes: Attributes::default(),
3183 }));
3184 let field = AvroFieldBuilder::new(&writer_record)
3185 .with_reader_schema(&reader_record)
3186 .with_utf8view(false)
3187 .with_strict_mode(false)
3188 .build()
3189 .unwrap();
3190 let dt = match field.data_type().codec() {
3192 Codec::Struct(fields) => fields[0].data_type().clone(),
3193 other => panic!("expected wrapper struct, got {other:?}"),
3194 };
3195 let mut dec = Decoder::try_new(&dt).unwrap();
3196 assert!(matches!(dec, Decoder::Int32(_)));
3197 for v in [7, -9] {
3198 let data = encode_avro_int(v);
3199 let mut cur = AvroCursor::new(&data);
3200 dec.decode(&mut cur).unwrap();
3201 }
3202 let arr = dec.flush(None).unwrap();
3203 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3204 assert_eq!(a.value(0), 7);
3205 assert_eq!(a.value(1), -9);
3206 }
3207
3208 #[test]
3209 fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
3210 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3211 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
3212 let writer_record = Schema::Complex(ComplexType::Record(Record {
3213 name: "Root",
3214 namespace: None,
3215 doc: None,
3216 aliases: vec![],
3217 fields: vec![Field {
3218 name: "v",
3219 r#type: ws,
3220 default: None,
3221 doc: None,
3222 aliases: vec![],
3223 }],
3224 attributes: Attributes::default(),
3225 }));
3226 let reader_record = Schema::Complex(ComplexType::Record(Record {
3227 name: "Root",
3228 namespace: None,
3229 doc: None,
3230 aliases: vec![],
3231 fields: vec![Field {
3232 name: "v",
3233 r#type: rs,
3234 default: None,
3235 doc: None,
3236 aliases: vec![],
3237 }],
3238 attributes: Attributes::default(),
3239 }));
3240 let res = AvroFieldBuilder::new(&writer_record)
3241 .with_reader_schema(&reader_record)
3242 .with_utf8view(false)
3243 .with_strict_mode(false)
3244 .build();
3245 assert!(res.is_err(), "expected error for illegal promotion");
3246 }
3247
3248 #[test]
3249 fn test_map_decoding_one_entry() {
3250 let value_type = avro_from_codec(Codec::Utf8);
3251 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3252 let mut decoder = Decoder::try_new(&map_type).unwrap();
3253 let mut data = Vec::new();
3255 data.extend_from_slice(&encode_avro_long(1));
3256 data.extend_from_slice(&encode_avro_bytes(b"hello")); data.extend_from_slice(&encode_avro_bytes(b"world")); data.extend_from_slice(&encode_avro_long(0));
3259 let mut cursor = AvroCursor::new(&data);
3260 decoder.decode(&mut cursor).unwrap();
3261 let array = decoder.flush(None).unwrap();
3262 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3263 assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 1);
3265 let entries = map_arr.value(0);
3266 let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
3267 assert_eq!(struct_entries.len(), 1);
3268 let key_arr = struct_entries
3269 .column_by_name("key")
3270 .unwrap()
3271 .as_any()
3272 .downcast_ref::<StringArray>()
3273 .unwrap();
3274 let val_arr = struct_entries
3275 .column_by_name("value")
3276 .unwrap()
3277 .as_any()
3278 .downcast_ref::<StringArray>()
3279 .unwrap();
3280 assert_eq!(key_arr.value(0), "hello");
3281 assert_eq!(val_arr.value(0), "world");
3282 }
3283
3284 #[test]
3285 fn test_map_decoding_empty() {
3286 let value_type = avro_from_codec(Codec::Utf8);
3287 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3288 let mut decoder = Decoder::try_new(&map_type).unwrap();
3289 let data = encode_avro_long(0);
3290 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3291 let array = decoder.flush(None).unwrap();
3292 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3293 assert_eq!(map_arr.len(), 1);
3294 assert_eq!(map_arr.value_length(0), 0);
3295 }
3296
3297 #[test]
3298 fn test_fixed_decoding() {
3299 let avro_type = avro_from_codec(Codec::Fixed(3));
3300 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3301
3302 let data1 = [1u8, 2, 3];
3303 let mut cursor1 = AvroCursor::new(&data1);
3304 decoder
3305 .decode(&mut cursor1)
3306 .expect("Failed to decode data1");
3307 assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
3308 let data2 = [4u8, 5, 6];
3309 let mut cursor2 = AvroCursor::new(&data2);
3310 decoder
3311 .decode(&mut cursor2)
3312 .expect("Failed to decode data2");
3313 assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
3314 let array = decoder.flush(None).expect("Failed to flush decoder");
3315 assert_eq!(array.len(), 2, "Array should contain two items");
3316 let fixed_size_binary_array = array
3317 .as_any()
3318 .downcast_ref::<FixedSizeBinaryArray>()
3319 .expect("Failed to downcast to FixedSizeBinaryArray");
3320 assert_eq!(
3321 fixed_size_binary_array.value_length(),
3322 3,
3323 "Fixed size of binary values should be 3"
3324 );
3325 assert_eq!(
3326 fixed_size_binary_array.value(0),
3327 &[1, 2, 3],
3328 "First item mismatch"
3329 );
3330 assert_eq!(
3331 fixed_size_binary_array.value(1),
3332 &[4, 5, 6],
3333 "Second item mismatch"
3334 );
3335 }
3336
3337 #[test]
3338 fn test_fixed_decoding_empty() {
3339 let avro_type = avro_from_codec(Codec::Fixed(5));
3340 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3341
3342 let array = decoder
3343 .flush(None)
3344 .expect("Failed to flush decoder for empty input");
3345
3346 assert_eq!(array.len(), 0, "Array should be empty");
3347 let fixed_size_binary_array = array
3348 .as_any()
3349 .downcast_ref::<FixedSizeBinaryArray>()
3350 .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
3351
3352 assert_eq!(
3353 fixed_size_binary_array.value_length(),
3354 5,
3355 "Fixed size of binary values should be 5 as per type"
3356 );
3357 }
3358
3359 #[test]
3360 fn test_uuid_decoding() {
3361 let avro_type = avro_from_codec(Codec::Uuid);
3362 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3363 let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
3364 let data = encode_avro_bytes(uuid_str.as_bytes());
3365 let mut cursor = AvroCursor::new(&data);
3366 decoder.decode(&mut cursor).expect("Failed to decode data");
3367 assert_eq!(
3368 cursor.position(),
3369 data.len(),
3370 "Cursor should advance by varint size + data size"
3371 );
3372 let array = decoder.flush(None).expect("Failed to flush decoder");
3373 let fixed_size_binary_array = array
3374 .as_any()
3375 .downcast_ref::<FixedSizeBinaryArray>()
3376 .expect("Array should be a FixedSizeBinaryArray");
3377 assert_eq!(fixed_size_binary_array.len(), 1);
3378 assert_eq!(fixed_size_binary_array.value_length(), 16);
3379 let expected_bytes = [
3380 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
3381 0x6b, 0xf6,
3382 ];
3383 assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
3384 }
3385
3386 #[test]
3387 fn test_array_decoding() {
3388 let item_dt = avro_from_codec(Codec::Int32);
3389 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3390 let mut decoder = Decoder::try_new(&list_dt).unwrap();
3391 let mut row1 = Vec::new();
3392 row1.extend_from_slice(&encode_avro_long(2));
3393 row1.extend_from_slice(&encode_avro_int(10));
3394 row1.extend_from_slice(&encode_avro_int(20));
3395 row1.extend_from_slice(&encode_avro_long(0));
3396 let row2 = encode_avro_long(0);
3397 let mut cursor = AvroCursor::new(&row1);
3398 decoder.decode(&mut cursor).unwrap();
3399 let mut cursor2 = AvroCursor::new(&row2);
3400 decoder.decode(&mut cursor2).unwrap();
3401 let array = decoder.flush(None).unwrap();
3402 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3403 assert_eq!(list_arr.len(), 2);
3404 let offsets = list_arr.value_offsets();
3405 assert_eq!(offsets, &[0, 2, 2]);
3406 let values = list_arr.values();
3407 let int_arr = values.as_primitive::<Int32Type>();
3408 assert_eq!(int_arr.len(), 2);
3409 assert_eq!(int_arr.value(0), 10);
3410 assert_eq!(int_arr.value(1), 20);
3411 }
3412
3413 #[test]
3414 fn test_array_decoding_with_negative_block_count() {
3415 let item_dt = avro_from_codec(Codec::Int32);
3416 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3417 let mut decoder = Decoder::try_new(&list_dt).unwrap();
3418 let mut data = encode_avro_long(-3);
3419 data.extend_from_slice(&encode_avro_long(12));
3420 data.extend_from_slice(&encode_avro_int(1));
3421 data.extend_from_slice(&encode_avro_int(2));
3422 data.extend_from_slice(&encode_avro_int(3));
3423 data.extend_from_slice(&encode_avro_long(0));
3424 let mut cursor = AvroCursor::new(&data);
3425 decoder.decode(&mut cursor).unwrap();
3426 let array = decoder.flush(None).unwrap();
3427 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3428 assert_eq!(list_arr.len(), 1);
3429 assert_eq!(list_arr.value_length(0), 3);
3430 let values = list_arr.values().as_primitive::<Int32Type>();
3431 assert_eq!(values.len(), 3);
3432 assert_eq!(values.value(0), 1);
3433 assert_eq!(values.value(1), 2);
3434 assert_eq!(values.value(2), 3);
3435 }
3436
3437 #[test]
3438 fn test_nested_array_decoding() {
3439 let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3440 let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
3441 let mut decoder = Decoder::try_new(&nested_ty).unwrap();
3442 let mut buf = Vec::new();
3443 buf.extend(encode_avro_long(1));
3444 buf.extend(encode_avro_long(2));
3445 buf.extend(encode_avro_int(5));
3446 buf.extend(encode_avro_int(6));
3447 buf.extend(encode_avro_long(0));
3448 buf.extend(encode_avro_long(0));
3449 let mut cursor = AvroCursor::new(&buf);
3450 decoder.decode(&mut cursor).unwrap();
3451 let arr = decoder.flush(None).unwrap();
3452 let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
3453 assert_eq!(outer.len(), 1);
3454 assert_eq!(outer.value_length(0), 1);
3455 let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
3456 assert_eq!(inner.len(), 1);
3457 assert_eq!(inner.value_length(0), 2);
3458 let values = inner
3459 .values()
3460 .as_any()
3461 .downcast_ref::<Int32Array>()
3462 .unwrap();
3463 assert_eq!(values.values(), &[5, 6]);
3464 }
3465
3466 #[test]
3467 fn test_array_decoding_empty_array() {
3468 let value_type = avro_from_codec(Codec::Utf8);
3469 let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
3470 let mut decoder = Decoder::try_new(&map_type).unwrap();
3471 let data = encode_avro_long(0);
3472 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3473 let array = decoder.flush(None).unwrap();
3474 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3475 assert_eq!(list_arr.len(), 1);
3476 assert_eq!(list_arr.value_length(0), 0);
3477 }
3478
3479 #[test]
3480 fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
3481 use crate::schema::Array;
3482 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3483 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3484 attributes: Attributes::default(),
3485 }));
3486 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3487 items: Box::new(Schema::Union(vec![
3488 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3489 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3490 ])),
3491 attributes: Attributes::default(),
3492 }));
3493 let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
3494 if let Codec::List(inner) = dt.codec() {
3495 assert_eq!(
3496 inner.nullability(),
3497 Some(Nullability::NullFirst),
3498 "items should be nullable"
3499 );
3500 } else {
3501 panic!("expected List codec");
3502 }
3503 let mut decoder = Decoder::try_new(&dt).unwrap();
3504 let mut data = encode_avro_long(2);
3505 data.extend(encode_avro_int(10));
3506 data.extend(encode_avro_int(20));
3507 data.extend(encode_avro_long(0));
3508 let mut cursor = AvroCursor::new(&data);
3509 decoder.decode(&mut cursor).unwrap();
3510 assert_eq!(
3511 cursor.position(),
3512 data.len(),
3513 "all bytes should be consumed"
3514 );
3515 let array = decoder.flush(None).unwrap();
3516 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3517 assert_eq!(list_arr.len(), 1, "one list/row");
3518 assert_eq!(list_arr.value_length(0), 2, "two items in the list");
3519 let values = list_arr.values().as_primitive::<Int32Type>();
3520 assert_eq!(values.len(), 2);
3521 assert_eq!(values.value(0), 10);
3522 assert_eq!(values.value(1), 20);
3523 assert!(!values.is_null(0));
3524 assert!(!values.is_null(1));
3525 }
3526
3527 #[test]
3528 fn test_decimal_decoding_fixed256() {
3529 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3530 let mut decoder = Decoder::try_new(&dt).unwrap();
3531 let row1 = [
3532 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3533 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3534 0x00, 0x00, 0x30, 0x39,
3535 ];
3536 let row2 = [
3537 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3538 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3539 0xFF, 0xFF, 0xFF, 0x85,
3540 ];
3541 let mut data = Vec::new();
3542 data.extend_from_slice(&row1);
3543 data.extend_from_slice(&row2);
3544 let mut cursor = AvroCursor::new(&data);
3545 decoder.decode(&mut cursor).unwrap();
3546 decoder.decode(&mut cursor).unwrap();
3547 let arr = decoder.flush(None).unwrap();
3548 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3549 assert_eq!(dec.len(), 2);
3550 assert_eq!(dec.value_as_string(0), "123.45");
3551 assert_eq!(dec.value_as_string(1), "-1.23");
3552 }
3553
3554 #[test]
3555 fn test_decimal_decoding_fixed128() {
3556 let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
3557 let mut decoder = Decoder::try_new(&dt).unwrap();
3558 let row1 = [
3559 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3560 0x30, 0x39,
3561 ];
3562 let row2 = [
3563 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3564 0xFF, 0x85,
3565 ];
3566 let mut data = Vec::new();
3567 data.extend_from_slice(&row1);
3568 data.extend_from_slice(&row2);
3569 let mut cursor = AvroCursor::new(&data);
3570 decoder.decode(&mut cursor).unwrap();
3571 decoder.decode(&mut cursor).unwrap();
3572 let arr = decoder.flush(None).unwrap();
3573 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3574 assert_eq!(dec.len(), 2);
3575 assert_eq!(dec.value_as_string(0), "123.45");
3576 assert_eq!(dec.value_as_string(1), "-1.23");
3577 }
3578
3579 #[test]
3580 fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3581 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3582 let mut decoder = Decoder::try_new(&dt).unwrap();
3583 let row1 = [
3584 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3585 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3586 0x00, 0x00, 0x30, 0x39,
3587 ];
3588 let row2 = [
3589 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3590 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3591 0xFF, 0xFF, 0xFF, 0x85,
3592 ];
3593 let mut data = Vec::new();
3594 data.extend_from_slice(&row1);
3595 data.extend_from_slice(&row2);
3596 let mut cursor = AvroCursor::new(&data);
3597 decoder.decode(&mut cursor).unwrap();
3598 decoder.decode(&mut cursor).unwrap();
3599 let arr = decoder.flush(None).unwrap();
3600 #[cfg(feature = "small_decimals")]
3601 {
3602 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3603 assert_eq!(dec.len(), 2);
3604 assert_eq!(dec.value_as_string(0), "123.45");
3605 assert_eq!(dec.value_as_string(1), "-1.23");
3606 }
3607 #[cfg(not(feature = "small_decimals"))]
3608 {
3609 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3610 assert_eq!(dec.len(), 2);
3611 assert_eq!(dec.value_as_string(0), "123.45");
3612 assert_eq!(dec.value_as_string(1), "-1.23");
3613 }
3614 }
3615
3616 #[test]
3617 fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3618 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3619 let mut decoder = Decoder::try_new(&dt).unwrap();
3620 let row1 = [
3621 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3622 0x30, 0x39,
3623 ];
3624 let row2 = [
3625 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3626 0xFF, 0x85,
3627 ];
3628 let mut data = Vec::new();
3629 data.extend_from_slice(&row1);
3630 data.extend_from_slice(&row2);
3631 let mut cursor = AvroCursor::new(&data);
3632 decoder.decode(&mut cursor).unwrap();
3633 decoder.decode(&mut cursor).unwrap();
3634
3635 let arr = decoder.flush(None).unwrap();
3636 #[cfg(feature = "small_decimals")]
3637 {
3638 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3639 assert_eq!(dec.len(), 2);
3640 assert_eq!(dec.value_as_string(0), "123.45");
3641 assert_eq!(dec.value_as_string(1), "-1.23");
3642 }
3643 #[cfg(not(feature = "small_decimals"))]
3644 {
3645 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3646 assert_eq!(dec.len(), 2);
3647 assert_eq!(dec.value_as_string(0), "123.45");
3648 assert_eq!(dec.value_as_string(1), "-1.23");
3649 }
3650 }
3651
3652 #[test]
3653 fn test_decimal_decoding_bytes_with_nulls() {
3654 let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3655 let inner = Decoder::try_new(&dt).unwrap();
3656 let mut decoder = Decoder::Nullable(
3657 NullablePlan::ReadTag {
3658 nullability: Nullability::NullSecond,
3659 resolution: ResolutionPlan::Promotion(Promotion::Direct),
3660 },
3661 NullBufferBuilder::new(DEFAULT_CAPACITY),
3662 Box::new(inner),
3663 );
3664 let mut data = Vec::new();
3665 data.extend_from_slice(&encode_avro_int(0));
3666 data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3667 data.extend_from_slice(&encode_avro_int(1));
3668 data.extend_from_slice(&encode_avro_int(0));
3669 data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3670 let mut cursor = AvroCursor::new(&data);
3671 decoder.decode(&mut cursor).unwrap();
3672 decoder.decode(&mut cursor).unwrap();
3673 decoder.decode(&mut cursor).unwrap();
3674 let arr = decoder.flush(None).unwrap();
3675 #[cfg(feature = "small_decimals")]
3676 {
3677 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3678 assert_eq!(dec_arr.len(), 3);
3679 assert!(dec_arr.is_valid(0));
3680 assert!(!dec_arr.is_valid(1));
3681 assert!(dec_arr.is_valid(2));
3682 assert_eq!(dec_arr.value_as_string(0), "123.4");
3683 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3684 }
3685 #[cfg(not(feature = "small_decimals"))]
3686 {
3687 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3688 assert_eq!(dec_arr.len(), 3);
3689 assert!(dec_arr.is_valid(0));
3690 assert!(!dec_arr.is_valid(1));
3691 assert!(dec_arr.is_valid(2));
3692 assert_eq!(dec_arr.value_as_string(0), "123.4");
3693 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3694 }
3695 }
3696
3697 #[test]
3698 fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3699 let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3700 let inner = Decoder::try_new(&dt).unwrap();
3701 let mut decoder = Decoder::Nullable(
3702 NullablePlan::ReadTag {
3703 nullability: Nullability::NullSecond,
3704 resolution: ResolutionPlan::Promotion(Promotion::Direct),
3705 },
3706 NullBufferBuilder::new(DEFAULT_CAPACITY),
3707 Box::new(inner),
3708 );
3709 let row1 = [
3710 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3711 0xE2, 0x40,
3712 ];
3713 let row3 = [
3714 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3715 0x1D, 0xC0,
3716 ];
3717 let mut data = Vec::new();
3718 data.extend_from_slice(&encode_avro_int(0));
3719 data.extend_from_slice(&row1);
3720 data.extend_from_slice(&encode_avro_int(1));
3721 data.extend_from_slice(&encode_avro_int(0));
3722 data.extend_from_slice(&row3);
3723 let mut cursor = AvroCursor::new(&data);
3724 decoder.decode(&mut cursor).unwrap();
3725 decoder.decode(&mut cursor).unwrap();
3726 decoder.decode(&mut cursor).unwrap();
3727 let arr = decoder.flush(None).unwrap();
3728 #[cfg(feature = "small_decimals")]
3729 {
3730 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3731 assert_eq!(dec_arr.len(), 3);
3732 assert!(dec_arr.is_valid(0));
3733 assert!(!dec_arr.is_valid(1));
3734 assert!(dec_arr.is_valid(2));
3735 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3736 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3737 }
3738 #[cfg(not(feature = "small_decimals"))]
3739 {
3740 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3741 assert_eq!(dec_arr.len(), 3);
3742 assert!(dec_arr.is_valid(0));
3743 assert!(!dec_arr.is_valid(1));
3744 assert!(dec_arr.is_valid(2));
3745 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3746 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3747 }
3748 }
3749
3750 #[test]
3751 fn test_enum_decoding() {
3752 let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3753 let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3754 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3755 let mut data = Vec::new();
3756 data.extend_from_slice(&encode_avro_int(2));
3757 data.extend_from_slice(&encode_avro_int(0));
3758 data.extend_from_slice(&encode_avro_int(1));
3759 let mut cursor = AvroCursor::new(&data);
3760 decoder.decode(&mut cursor).unwrap();
3761 decoder.decode(&mut cursor).unwrap();
3762 decoder.decode(&mut cursor).unwrap();
3763 let array = decoder.flush(None).unwrap();
3764 let dict_array = array
3765 .as_any()
3766 .downcast_ref::<DictionaryArray<Int32Type>>()
3767 .unwrap();
3768 assert_eq!(dict_array.len(), 3);
3769 let values = dict_array
3770 .values()
3771 .as_any()
3772 .downcast_ref::<StringArray>()
3773 .unwrap();
3774 assert_eq!(values.value(0), "A");
3775 assert_eq!(values.value(1), "B");
3776 assert_eq!(values.value(2), "C");
3777 assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3778 }
3779
3780 #[test]
3781 fn test_enum_decoding_with_nulls() {
3782 let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3783 let enum_codec = Codec::Enum(symbols.clone());
3784 let avro_type =
3785 AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3786 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3787 let mut data = Vec::new();
3788 data.extend_from_slice(&encode_avro_long(1));
3789 data.extend_from_slice(&encode_avro_int(1));
3790 data.extend_from_slice(&encode_avro_long(0));
3791 data.extend_from_slice(&encode_avro_long(1));
3792 data.extend_from_slice(&encode_avro_int(0));
3793 let mut cursor = AvroCursor::new(&data);
3794 decoder.decode(&mut cursor).unwrap();
3795 decoder.decode(&mut cursor).unwrap();
3796 decoder.decode(&mut cursor).unwrap();
3797 let array = decoder.flush(None).unwrap();
3798 let dict_array = array
3799 .as_any()
3800 .downcast_ref::<DictionaryArray<Int32Type>>()
3801 .unwrap();
3802 assert_eq!(dict_array.len(), 3);
3803 assert!(dict_array.is_valid(0));
3804 assert!(dict_array.is_null(1));
3805 assert!(dict_array.is_valid(2));
3806 let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3807 assert_eq!(dict_array.keys(), &expected_keys);
3808 let values = dict_array
3809 .values()
3810 .as_any()
3811 .downcast_ref::<StringArray>()
3812 .unwrap();
3813 assert_eq!(values.value(0), "X");
3814 assert_eq!(values.value(1), "Y");
3815 }
3816
3817 #[test]
3818 fn test_duration_decoding_with_nulls() {
3819 let duration_codec = Codec::Interval;
3820 let avro_type = AvroDataType::new(
3821 duration_codec,
3822 Default::default(),
3823 Some(Nullability::NullFirst),
3824 );
3825 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3826 let mut data = Vec::new();
3827 data.extend_from_slice(&encode_avro_long(1)); let mut duration1 = Vec::new();
3830 duration1.extend_from_slice(&1u32.to_le_bytes());
3831 duration1.extend_from_slice(&2u32.to_le_bytes());
3832 duration1.extend_from_slice(&3u32.to_le_bytes());
3833 data.extend_from_slice(&duration1);
3834 data.extend_from_slice(&encode_avro_long(0)); data.extend_from_slice(&encode_avro_long(1)); let mut duration2 = Vec::new();
3838 duration2.extend_from_slice(&4u32.to_le_bytes());
3839 duration2.extend_from_slice(&5u32.to_le_bytes());
3840 duration2.extend_from_slice(&6u32.to_le_bytes());
3841 data.extend_from_slice(&duration2);
3842 let mut cursor = AvroCursor::new(&data);
3843 decoder.decode(&mut cursor).unwrap();
3844 decoder.decode(&mut cursor).unwrap();
3845 decoder.decode(&mut cursor).unwrap();
3846 let array = decoder.flush(None).unwrap();
3847 let interval_array = array
3848 .as_any()
3849 .downcast_ref::<IntervalMonthDayNanoArray>()
3850 .unwrap();
3851 assert_eq!(interval_array.len(), 3);
3852 assert!(interval_array.is_valid(0));
3853 assert!(interval_array.is_null(1));
3854 assert!(interval_array.is_valid(2));
3855 let expected = IntervalMonthDayNanoArray::from(vec![
3856 Some(IntervalMonthDayNano {
3857 months: 1,
3858 days: 2,
3859 nanoseconds: 3_000_000,
3860 }),
3861 None,
3862 Some(IntervalMonthDayNano {
3863 months: 4,
3864 days: 5,
3865 nanoseconds: 6_000_000,
3866 }),
3867 ]);
3868 assert_eq!(interval_array, &expected);
3869 }
3870
3871 #[cfg(feature = "avro_custom_types")]
3872 #[test]
3873 fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3874 let avro_type = AvroDataType::new(
3875 Codec::IntervalMonthDayNano,
3876 Default::default(),
3877 Some(Nullability::NullFirst),
3878 );
3879 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3880 let mut data = Vec::new();
3881 data.extend_from_slice(&encode_avro_long(1));
3883 data.extend_from_slice(&1i32.to_le_bytes());
3884 data.extend_from_slice(&(-2i32).to_le_bytes());
3885 data.extend_from_slice(&3i64.to_le_bytes());
3886 data.extend_from_slice(&encode_avro_long(0));
3888 data.extend_from_slice(&encode_avro_long(1));
3890 data.extend_from_slice(&(-4i32).to_le_bytes());
3891 data.extend_from_slice(&5i32.to_le_bytes());
3892 data.extend_from_slice(&(-6i64).to_le_bytes());
3893 let mut cursor = AvroCursor::new(&data);
3894 decoder.decode(&mut cursor).unwrap();
3895 decoder.decode(&mut cursor).unwrap();
3896 decoder.decode(&mut cursor).unwrap();
3897 let array = decoder.flush(None).unwrap();
3898 let interval_array = array
3899 .as_any()
3900 .downcast_ref::<IntervalMonthDayNanoArray>()
3901 .unwrap();
3902 assert_eq!(interval_array.len(), 3);
3903 let expected = IntervalMonthDayNanoArray::from(vec![
3904 Some(IntervalMonthDayNano::new(1, -2, 3)),
3905 None,
3906 Some(IntervalMonthDayNano::new(-4, 5, -6)),
3907 ]);
3908 assert_eq!(interval_array, &expected);
3909 }
3910
3911 #[test]
3912 fn test_duration_decoding_empty() {
3913 let duration_codec = Codec::Interval;
3914 let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3915 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3916 let array = decoder.flush(None).unwrap();
3917 assert_eq!(array.len(), 0);
3918 }
3919
3920 #[test]
3921 #[cfg(feature = "avro_custom_types")]
3922 fn test_duration_seconds_decoding() {
3923 let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3924 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3925 let mut data = Vec::new();
3926 data.extend_from_slice(&encode_avro_long(0));
3928 data.extend_from_slice(&encode_avro_long(-1));
3929 data.extend_from_slice(&encode_avro_long(2));
3930 let mut cursor = AvroCursor::new(&data);
3931 decoder.decode(&mut cursor).unwrap();
3932 decoder.decode(&mut cursor).unwrap();
3933 decoder.decode(&mut cursor).unwrap();
3934 let array = decoder.flush(None).unwrap();
3935 let dur = array
3936 .as_any()
3937 .downcast_ref::<DurationSecondArray>()
3938 .unwrap();
3939 assert_eq!(dur.values(), &[0, -1, 2]);
3940 }
3941
3942 #[test]
3943 #[cfg(feature = "avro_custom_types")]
3944 fn test_duration_milliseconds_decoding() {
3945 let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3946 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3947 let mut data = Vec::new();
3948 for v in [1i64, 0, -2] {
3949 data.extend_from_slice(&encode_avro_long(v));
3950 }
3951 let mut cursor = AvroCursor::new(&data);
3952 for _ in 0..3 {
3953 decoder.decode(&mut cursor).unwrap();
3954 }
3955 let array = decoder.flush(None).unwrap();
3956 let dur = array
3957 .as_any()
3958 .downcast_ref::<DurationMillisecondArray>()
3959 .unwrap();
3960 assert_eq!(dur.values(), &[1, 0, -2]);
3961 }
3962
3963 #[test]
3964 #[cfg(feature = "avro_custom_types")]
3965 fn test_duration_microseconds_decoding() {
3966 let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3967 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3968 let mut data = Vec::new();
3969 for v in [5i64, -6, 7] {
3970 data.extend_from_slice(&encode_avro_long(v));
3971 }
3972 let mut cursor = AvroCursor::new(&data);
3973 for _ in 0..3 {
3974 decoder.decode(&mut cursor).unwrap();
3975 }
3976 let array = decoder.flush(None).unwrap();
3977 let dur = array
3978 .as_any()
3979 .downcast_ref::<DurationMicrosecondArray>()
3980 .unwrap();
3981 assert_eq!(dur.values(), &[5, -6, 7]);
3982 }
3983
3984 #[test]
3985 #[cfg(feature = "avro_custom_types")]
3986 fn test_duration_nanoseconds_decoding() {
3987 let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3988 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3989 let mut data = Vec::new();
3990 for v in [8i64, 9, -10] {
3991 data.extend_from_slice(&encode_avro_long(v));
3992 }
3993 let mut cursor = AvroCursor::new(&data);
3994 for _ in 0..3 {
3995 decoder.decode(&mut cursor).unwrap();
3996 }
3997 let array = decoder.flush(None).unwrap();
3998 let dur = array
3999 .as_any()
4000 .downcast_ref::<DurationNanosecondArray>()
4001 .unwrap();
4002 assert_eq!(dur.values(), &[8, 9, -10]);
4003 }
4004
4005 #[test]
4006 fn test_nullable_decode_error_bitmap_corruption() {
4007 let avro_type = AvroDataType::new(
4009 Codec::Int32,
4010 Default::default(),
4011 Some(Nullability::NullSecond),
4012 );
4013 let mut decoder = Decoder::try_new(&avro_type).unwrap();
4014
4015 let mut row1 = Vec::new();
4017 row1.extend_from_slice(&encode_avro_int(1));
4018
4019 let mut row2 = Vec::new();
4021 row2.extend_from_slice(&encode_avro_int(0)); let mut row3 = Vec::new();
4025 row3.extend_from_slice(&encode_avro_int(0)); row3.extend_from_slice(&encode_avro_int(42)); decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
4029 assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
4031
4032 let array = decoder.flush(None).unwrap();
4033
4034 assert_eq!(array.len(), 2);
4036 let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
4037 assert!(int_array.is_null(0)); assert_eq!(int_array.value(1), 42); }
4040
4041 #[test]
4042 fn test_enum_mapping_reordered_symbols() {
4043 let reader_symbols: Arc<[String]> =
4044 vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
4045 let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
4046 let default_index: i32 = -1;
4047 let mut dec = Decoder::Enum(
4048 Vec::with_capacity(DEFAULT_CAPACITY),
4049 reader_symbols.clone(),
4050 Some(EnumResolution {
4051 mapping,
4052 default_index,
4053 }),
4054 );
4055 let mut data = Vec::new();
4056 data.extend_from_slice(&encode_avro_int(0));
4057 data.extend_from_slice(&encode_avro_int(1));
4058 data.extend_from_slice(&encode_avro_int(2));
4059 let mut cur = AvroCursor::new(&data);
4060 dec.decode(&mut cur).unwrap();
4061 dec.decode(&mut cur).unwrap();
4062 dec.decode(&mut cur).unwrap();
4063 let arr = dec.flush(None).unwrap();
4064 let dict = arr
4065 .as_any()
4066 .downcast_ref::<DictionaryArray<Int32Type>>()
4067 .unwrap();
4068 let expected_keys = Int32Array::from(vec![2, 0, 1]);
4069 assert_eq!(dict.keys(), &expected_keys);
4070 let values = dict
4071 .values()
4072 .as_any()
4073 .downcast_ref::<StringArray>()
4074 .unwrap();
4075 assert_eq!(values.value(0), "B");
4076 assert_eq!(values.value(1), "C");
4077 assert_eq!(values.value(2), "A");
4078 }
4079
4080 #[test]
4081 fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
4082 let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
4083 let default_index: i32 = 1;
4084 let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
4085 let mut dec = Decoder::Enum(
4086 Vec::with_capacity(DEFAULT_CAPACITY),
4087 reader_symbols.clone(),
4088 Some(EnumResolution {
4089 mapping,
4090 default_index,
4091 }),
4092 );
4093 let mut data = Vec::new();
4094 data.extend_from_slice(&encode_avro_int(0));
4095 data.extend_from_slice(&encode_avro_int(1));
4096 data.extend_from_slice(&encode_avro_int(99));
4097 let mut cur = AvroCursor::new(&data);
4098 dec.decode(&mut cur).unwrap();
4099 dec.decode(&mut cur).unwrap();
4100 dec.decode(&mut cur).unwrap();
4101 let arr = dec.flush(None).unwrap();
4102 let dict = arr
4103 .as_any()
4104 .downcast_ref::<DictionaryArray<Int32Type>>()
4105 .unwrap();
4106 let expected_keys = Int32Array::from(vec![0, 1, 1]);
4107 assert_eq!(dict.keys(), &expected_keys);
4108 let values = dict
4109 .values()
4110 .as_any()
4111 .downcast_ref::<StringArray>()
4112 .unwrap();
4113 assert_eq!(values.value(0), "A");
4114 assert_eq!(values.value(1), "B");
4115 }
4116
4117 #[test]
4118 fn test_enum_mapping_unknown_symbol_without_default_errors() {
4119 let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
4120 let default_index: i32 = -1; let mapping: Arc<[i32]> = Arc::from(vec![-1]);
4122 let mut dec = Decoder::Enum(
4123 Vec::with_capacity(DEFAULT_CAPACITY),
4124 reader_symbols,
4125 Some(EnumResolution {
4126 mapping,
4127 default_index,
4128 }),
4129 );
4130 let data = encode_avro_int(0);
4131 let mut cur = AvroCursor::new(&data);
4132 let err = dec
4133 .decode(&mut cur)
4134 .expect_err("expected decode error for unresolved enum without default");
4135 let msg = err.to_string();
4136 assert!(
4137 msg.contains("not resolvable") && msg.contains("no default"),
4138 "unexpected error message: {msg}"
4139 );
4140 }
4141
4142 fn make_record_resolved_decoder(
4143 reader_fields: &[(&str, DataType, bool)],
4144 writer_projections: Vec<FieldProjection>,
4145 ) -> Decoder {
4146 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4147 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4148 for (name, dt, nullable) in reader_fields {
4149 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4150 let enc = match dt {
4151 DataType::Int32 => Decoder::Int32(Vec::new()),
4152 DataType::Int64 => Decoder::Int64(Vec::new()),
4153 DataType::Utf8 => {
4154 Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
4155 }
4156 other => panic!("Unsupported test reader field type: {other:?}"),
4157 };
4158 encodings.push(enc);
4159 }
4160 let fields: Fields = field_refs.into();
4161 Decoder::Record(
4162 fields,
4163 encodings,
4164 vec![None; reader_fields.len()],
4165 Some(Projector {
4166 writer_projections,
4167 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4168 }),
4169 )
4170 }
4171
4172 #[test]
4173 fn test_skip_writer_trailing_field_int32() {
4174 let mut dec = make_record_resolved_decoder(
4175 &[("id", arrow_schema::DataType::Int32, false)],
4176 vec![
4177 FieldProjection::ToReader(0),
4178 FieldProjection::Skip(super::Skipper::Int32),
4179 ],
4180 );
4181 let mut data = Vec::new();
4182 data.extend_from_slice(&encode_avro_int(7));
4183 data.extend_from_slice(&encode_avro_int(999));
4184 let mut cur = AvroCursor::new(&data);
4185 dec.decode(&mut cur).unwrap();
4186 assert_eq!(cur.position(), data.len());
4187 let arr = dec.flush(None).unwrap();
4188 let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
4189 assert_eq!(struct_arr.len(), 1);
4190 let id = struct_arr
4191 .column_by_name("id")
4192 .unwrap()
4193 .as_any()
4194 .downcast_ref::<Int32Array>()
4195 .unwrap();
4196 assert_eq!(id.value(0), 7);
4197 }
4198
4199 #[test]
4200 fn test_skip_writer_middle_field_string() {
4201 let mut dec = make_record_resolved_decoder(
4202 &[
4203 ("id", DataType::Int32, false),
4204 ("score", DataType::Int64, false),
4205 ],
4206 vec![
4207 FieldProjection::ToReader(0),
4208 FieldProjection::Skip(Skipper::String),
4209 FieldProjection::ToReader(1),
4210 ],
4211 );
4212 let mut data = Vec::new();
4213 data.extend_from_slice(&encode_avro_int(42));
4214 data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
4215 data.extend_from_slice(&encode_avro_long(1000));
4216 let mut cur = AvroCursor::new(&data);
4217 dec.decode(&mut cur).unwrap();
4218 assert_eq!(cur.position(), data.len());
4219 let arr = dec.flush(None).unwrap();
4220 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4221 let id = s
4222 .column_by_name("id")
4223 .unwrap()
4224 .as_any()
4225 .downcast_ref::<Int32Array>()
4226 .unwrap();
4227 let score = s
4228 .column_by_name("score")
4229 .unwrap()
4230 .as_any()
4231 .downcast_ref::<Int64Array>()
4232 .unwrap();
4233 assert_eq!(id.value(0), 42);
4234 assert_eq!(score.value(0), 1000);
4235 }
4236
4237 #[test]
4238 fn test_skip_writer_array_with_negative_block_count_fast() {
4239 let mut dec = make_record_resolved_decoder(
4240 &[("id", DataType::Int32, false)],
4241 vec![
4242 FieldProjection::Skip(super::Skipper::List(Box::new(Skipper::Int32))),
4243 FieldProjection::ToReader(0),
4244 ],
4245 );
4246 let mut array_payload = Vec::new();
4247 array_payload.extend_from_slice(&encode_avro_int(1));
4248 array_payload.extend_from_slice(&encode_avro_int(2));
4249 array_payload.extend_from_slice(&encode_avro_int(3));
4250 let mut data = Vec::new();
4251 data.extend_from_slice(&encode_avro_long(-3));
4252 data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
4253 data.extend_from_slice(&array_payload);
4254 data.extend_from_slice(&encode_avro_long(0));
4255 data.extend_from_slice(&encode_avro_int(5));
4256 let mut cur = AvroCursor::new(&data);
4257 dec.decode(&mut cur).unwrap();
4258 assert_eq!(cur.position(), data.len());
4259 let arr = dec.flush(None).unwrap();
4260 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4261 let id = s
4262 .column_by_name("id")
4263 .unwrap()
4264 .as_any()
4265 .downcast_ref::<Int32Array>()
4266 .unwrap();
4267 assert_eq!(id.len(), 1);
4268 assert_eq!(id.value(0), 5);
4269 }
4270
4271 #[test]
4272 fn test_skip_writer_map_with_negative_block_count_fast() {
4273 let mut dec = make_record_resolved_decoder(
4274 &[("id", DataType::Int32, false)],
4275 vec![
4276 FieldProjection::Skip(Skipper::Map(Box::new(Skipper::Int32))),
4277 FieldProjection::ToReader(0),
4278 ],
4279 );
4280 let mut entries = Vec::new();
4281 entries.extend_from_slice(&encode_avro_bytes(b"k1"));
4282 entries.extend_from_slice(&encode_avro_int(10));
4283 entries.extend_from_slice(&encode_avro_bytes(b"k2"));
4284 entries.extend_from_slice(&encode_avro_int(20));
4285 let mut data = Vec::new();
4286 data.extend_from_slice(&encode_avro_long(-2));
4287 data.extend_from_slice(&encode_avro_long(entries.len() as i64));
4288 data.extend_from_slice(&entries);
4289 data.extend_from_slice(&encode_avro_long(0));
4290 data.extend_from_slice(&encode_avro_int(123));
4291 let mut cur = AvroCursor::new(&data);
4292 dec.decode(&mut cur).unwrap();
4293 assert_eq!(cur.position(), data.len());
4294 let arr = dec.flush(None).unwrap();
4295 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4296 let id = s
4297 .column_by_name("id")
4298 .unwrap()
4299 .as_any()
4300 .downcast_ref::<Int32Array>()
4301 .unwrap();
4302 assert_eq!(id.len(), 1);
4303 assert_eq!(id.value(0), 123);
4304 }
4305
4306 #[test]
4307 fn test_skip_writer_nullable_field_union_nullfirst() {
4308 let mut dec = make_record_resolved_decoder(
4309 &[("id", DataType::Int32, false)],
4310 vec![
4311 FieldProjection::Skip(super::Skipper::Nullable(
4312 Nullability::NullFirst,
4313 Box::new(super::Skipper::Int32),
4314 )),
4315 FieldProjection::ToReader(0),
4316 ],
4317 );
4318 let mut row1 = Vec::new();
4319 row1.extend_from_slice(&encode_avro_long(0));
4320 row1.extend_from_slice(&encode_avro_int(5));
4321 let mut row2 = Vec::new();
4322 row2.extend_from_slice(&encode_avro_long(1));
4323 row2.extend_from_slice(&encode_avro_int(123));
4324 row2.extend_from_slice(&encode_avro_int(7));
4325 let mut cur1 = AvroCursor::new(&row1);
4326 let mut cur2 = AvroCursor::new(&row2);
4327 dec.decode(&mut cur1).unwrap();
4328 dec.decode(&mut cur2).unwrap();
4329 assert_eq!(cur1.position(), row1.len());
4330 assert_eq!(cur2.position(), row2.len());
4331 let arr = dec.flush(None).unwrap();
4332 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4333 let id = s
4334 .column_by_name("id")
4335 .unwrap()
4336 .as_any()
4337 .downcast_ref::<Int32Array>()
4338 .unwrap();
4339 assert_eq!(id.len(), 2);
4340 assert_eq!(id.value(0), 5);
4341 assert_eq!(id.value(1), 7);
4342 }
4343
4344 fn make_dense_union_avro(
4345 children: Vec<(Codec, &'_ str, DataType)>,
4346 type_ids: Vec<i8>,
4347 ) -> AvroDataType {
4348 let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
4349 let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
4350 for (codec, name, dt) in children.into_iter() {
4351 avro_children.push(AvroDataType::new(codec, Default::default(), None));
4352 fields.push(arrow_schema::Field::new(name, dt, true));
4353 }
4354 let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
4355 let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
4356 AvroDataType::new(union_codec, Default::default(), None)
4357 }
4358
4359 #[test]
4360 fn test_union_dense_two_children_custom_type_ids() {
4361 let union_dt = make_dense_union_avro(
4362 vec![
4363 (Codec::Int32, "i", DataType::Int32),
4364 (Codec::Utf8, "s", DataType::Utf8),
4365 ],
4366 vec![2, 5],
4367 );
4368 let mut dec = Decoder::try_new(&union_dt).unwrap();
4369 let mut r1 = Vec::new();
4370 r1.extend_from_slice(&encode_avro_long(0));
4371 r1.extend_from_slice(&encode_avro_int(7));
4372 let mut r2 = Vec::new();
4373 r2.extend_from_slice(&encode_avro_long(1));
4374 r2.extend_from_slice(&encode_avro_bytes(b"x"));
4375 let mut r3 = Vec::new();
4376 r3.extend_from_slice(&encode_avro_long(0));
4377 r3.extend_from_slice(&encode_avro_int(-1));
4378 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4379 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4380 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4381 let array = dec.flush(None).unwrap();
4382 let ua = array
4383 .as_any()
4384 .downcast_ref::<UnionArray>()
4385 .expect("expected UnionArray");
4386 assert_eq!(ua.len(), 3);
4387 assert_eq!(ua.type_id(0), 2);
4388 assert_eq!(ua.type_id(1), 5);
4389 assert_eq!(ua.type_id(2), 2);
4390 assert_eq!(ua.value_offset(0), 0);
4391 assert_eq!(ua.value_offset(1), 0);
4392 assert_eq!(ua.value_offset(2), 1);
4393 let int_child = ua
4394 .child(2)
4395 .as_any()
4396 .downcast_ref::<Int32Array>()
4397 .expect("int child");
4398 assert_eq!(int_child.len(), 2);
4399 assert_eq!(int_child.value(0), 7);
4400 assert_eq!(int_child.value(1), -1);
4401 let str_child = ua
4402 .child(5)
4403 .as_any()
4404 .downcast_ref::<StringArray>()
4405 .expect("string child");
4406 assert_eq!(str_child.len(), 1);
4407 assert_eq!(str_child.value(0), "x");
4408 }
4409
4410 #[test]
4411 fn test_union_dense_with_null_and_string_children() {
4412 let union_dt = make_dense_union_avro(
4413 vec![
4414 (Codec::Null, "n", DataType::Null),
4415 (Codec::Utf8, "s", DataType::Utf8),
4416 ],
4417 vec![42, 7],
4418 );
4419 let mut dec = Decoder::try_new(&union_dt).unwrap();
4420 let r1 = encode_avro_long(0);
4421 let mut r2 = Vec::new();
4422 r2.extend_from_slice(&encode_avro_long(1));
4423 r2.extend_from_slice(&encode_avro_bytes(b"abc"));
4424 let r3 = encode_avro_long(0);
4425 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4426 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4427 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4428 let array = dec.flush(None).unwrap();
4429 let ua = array
4430 .as_any()
4431 .downcast_ref::<UnionArray>()
4432 .expect("expected UnionArray");
4433 assert_eq!(ua.len(), 3);
4434 assert_eq!(ua.type_id(0), 42);
4435 assert_eq!(ua.type_id(1), 7);
4436 assert_eq!(ua.type_id(2), 42);
4437 assert_eq!(ua.value_offset(0), 0);
4438 assert_eq!(ua.value_offset(1), 0);
4439 assert_eq!(ua.value_offset(2), 1);
4440 let null_child = ua
4441 .child(42)
4442 .as_any()
4443 .downcast_ref::<NullArray>()
4444 .expect("null child");
4445 assert_eq!(null_child.len(), 2);
4446 let str_child = ua
4447 .child(7)
4448 .as_any()
4449 .downcast_ref::<StringArray>()
4450 .expect("string child");
4451 assert_eq!(str_child.len(), 1);
4452 assert_eq!(str_child.value(0), "abc");
4453 }
4454
4455 #[test]
4456 fn test_union_decode_negative_branch_index_errors() {
4457 let union_dt = make_dense_union_avro(
4458 vec![
4459 (Codec::Int32, "i", DataType::Int32),
4460 (Codec::Utf8, "s", DataType::Utf8),
4461 ],
4462 vec![0, 1],
4463 );
4464 let mut dec = Decoder::try_new(&union_dt).unwrap();
4465 let row = encode_avro_long(-1); let err = dec
4467 .decode(&mut AvroCursor::new(&row))
4468 .expect_err("expected error for negative branch index");
4469 let msg = err.to_string();
4470 assert!(
4471 msg.contains("Negative union branch index"),
4472 "unexpected error message: {msg}"
4473 );
4474 }
4475
4476 #[test]
4477 fn test_union_decode_out_of_range_branch_index_errors() {
4478 let union_dt = make_dense_union_avro(
4479 vec![
4480 (Codec::Int32, "i", DataType::Int32),
4481 (Codec::Utf8, "s", DataType::Utf8),
4482 ],
4483 vec![10, 11],
4484 );
4485 let mut dec = Decoder::try_new(&union_dt).unwrap();
4486 let row = encode_avro_long(2);
4487 let err = dec
4488 .decode(&mut AvroCursor::new(&row))
4489 .expect_err("expected error for out-of-range branch index");
4490 let msg = err.to_string();
4491 assert!(
4492 msg.contains("out of range"),
4493 "unexpected error message: {msg}"
4494 );
4495 }
4496
4497 #[test]
4498 fn test_union_sparse_mode_not_supported() {
4499 let children: Vec<AvroDataType> = vec![
4500 AvroDataType::new(Codec::Int32, Default::default(), None),
4501 AvroDataType::new(Codec::Utf8, Default::default(), None),
4502 ];
4503 let uf = UnionFields::try_new(
4504 vec![1, 3],
4505 vec![
4506 arrow_schema::Field::new("i", DataType::Int32, true),
4507 arrow_schema::Field::new("s", DataType::Utf8, true),
4508 ],
4509 )
4510 .unwrap();
4511 let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
4512 let dt = AvroDataType::new(codec, Default::default(), None);
4513 let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
4514 let msg = err.to_string();
4515 assert!(
4516 msg.contains("Sparse Arrow unions are not yet supported"),
4517 "unexpected error message: {msg}"
4518 );
4519 }
4520
4521 fn make_record_decoder_with_projector_defaults(
4522 reader_fields: &[(&str, DataType, bool)],
4523 field_defaults: Vec<Option<AvroLiteral>>,
4524 default_injections: Vec<(usize, AvroLiteral)>,
4525 ) -> Decoder {
4526 assert_eq!(
4527 field_defaults.len(),
4528 reader_fields.len(),
4529 "field_defaults must have one entry per reader field"
4530 );
4531 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4532 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4533 for (name, dt, nullable) in reader_fields {
4534 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4535 let enc = match dt {
4536 DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
4537 DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
4538 DataType::Utf8 => Decoder::String(
4539 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4540 Vec::with_capacity(DEFAULT_CAPACITY),
4541 ),
4542 other => panic!("Unsupported test field type in helper: {other:?}"),
4543 };
4544 encodings.push(enc);
4545 }
4546 let fields: Fields = field_refs.into();
4547 let projector = Projector {
4548 writer_projections: vec![],
4549 default_injections: Arc::from(default_injections),
4550 };
4551 Decoder::Record(fields, encodings, field_defaults, Some(projector))
4552 }
4553
4554 #[cfg(feature = "avro_custom_types")]
4555 #[test]
4556 fn test_default_append_custom_integer_range_validation() {
4557 let mut d_i8 = Decoder::Int8(Vec::with_capacity(DEFAULT_CAPACITY));
4558 d_i8.append_default(&AvroLiteral::Int(i8::MIN as i32))
4559 .unwrap();
4560 d_i8.append_default(&AvroLiteral::Int(i8::MAX as i32))
4561 .unwrap();
4562 let err_i8_high = d_i8
4563 .append_default(&AvroLiteral::Int(i8::MAX as i32 + 1))
4564 .unwrap_err();
4565 assert!(err_i8_high.to_string().contains("out of range for i8"));
4566 let err_i8_low = d_i8
4567 .append_default(&AvroLiteral::Int(i8::MIN as i32 - 1))
4568 .unwrap_err();
4569 assert!(err_i8_low.to_string().contains("out of range for i8"));
4570 let arr_i8 = d_i8.flush(None).unwrap();
4571 let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4572 assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4573
4574 let mut d_i16 = Decoder::Int16(Vec::with_capacity(DEFAULT_CAPACITY));
4575 d_i16
4576 .append_default(&AvroLiteral::Int(i16::MIN as i32))
4577 .unwrap();
4578 d_i16
4579 .append_default(&AvroLiteral::Int(i16::MAX as i32))
4580 .unwrap();
4581 let err_i16_high = d_i16
4582 .append_default(&AvroLiteral::Int(i16::MAX as i32 + 1))
4583 .unwrap_err();
4584 assert!(err_i16_high.to_string().contains("out of range for i16"));
4585 let err_i16_low = d_i16
4586 .append_default(&AvroLiteral::Int(i16::MIN as i32 - 1))
4587 .unwrap_err();
4588 assert!(err_i16_low.to_string().contains("out of range for i16"));
4589 let arr_i16 = d_i16.flush(None).unwrap();
4590 let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4591 assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4592
4593 let mut d_u8 = Decoder::UInt8(Vec::with_capacity(DEFAULT_CAPACITY));
4594 d_u8.append_default(&AvroLiteral::Int(0)).unwrap();
4595 d_u8.append_default(&AvroLiteral::Int(u8::MAX as i32))
4596 .unwrap();
4597 let err_u8_neg = d_u8.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4598 assert!(err_u8_neg.to_string().contains("out of range for u8"));
4599 let err_u8_high = d_u8
4600 .append_default(&AvroLiteral::Int(u8::MAX as i32 + 1))
4601 .unwrap_err();
4602 assert!(err_u8_high.to_string().contains("out of range for u8"));
4603 let arr_u8 = d_u8.flush(None).unwrap();
4604 let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4605 assert_eq!(values_u8.values(), &[0, u8::MAX]);
4606
4607 let mut d_u16 = Decoder::UInt16(Vec::with_capacity(DEFAULT_CAPACITY));
4608 d_u16.append_default(&AvroLiteral::Int(0)).unwrap();
4609 d_u16
4610 .append_default(&AvroLiteral::Int(u16::MAX as i32))
4611 .unwrap();
4612 let err_u16_neg = d_u16.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4613 assert!(err_u16_neg.to_string().contains("out of range for u16"));
4614 let err_u16_high = d_u16
4615 .append_default(&AvroLiteral::Int(u16::MAX as i32 + 1))
4616 .unwrap_err();
4617 assert!(err_u16_high.to_string().contains("out of range for u16"));
4618 let arr_u16 = d_u16.flush(None).unwrap();
4619 let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4620 assert_eq!(values_u16.values(), &[0, u16::MAX]);
4621
4622 let mut d_u32 = Decoder::UInt32(Vec::with_capacity(DEFAULT_CAPACITY));
4623 d_u32.append_default(&AvroLiteral::Long(0)).unwrap();
4624 d_u32
4625 .append_default(&AvroLiteral::Long(u32::MAX as i64))
4626 .unwrap();
4627 let err_u32_neg = d_u32.append_default(&AvroLiteral::Long(-1)).unwrap_err();
4628 assert!(err_u32_neg.to_string().contains("out of range for u32"));
4629 let err_u32_high = d_u32
4630 .append_default(&AvroLiteral::Long(u32::MAX as i64 + 1))
4631 .unwrap_err();
4632 assert!(err_u32_high.to_string().contains("out of range for u32"));
4633 let arr_u32 = d_u32.flush(None).unwrap();
4634 let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4635 assert_eq!(values_u32.values(), &[0, u32::MAX]);
4636 }
4637
4638 #[cfg(feature = "avro_custom_types")]
4639 #[test]
4640 fn test_decode_custom_integer_range_validation() {
4641 let mut d_i8 = Decoder::try_new(&avro_from_codec(Codec::Int8)).unwrap();
4642 d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32)))
4643 .unwrap();
4644 d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32)))
4645 .unwrap();
4646 let err_i8_high = d_i8
4647 .decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32 + 1)))
4648 .unwrap_err();
4649 assert!(err_i8_high.to_string().contains("out of range for i8"));
4650 let err_i8_low = d_i8
4651 .decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32 - 1)))
4652 .unwrap_err();
4653 assert!(err_i8_low.to_string().contains("out of range for i8"));
4654 let arr_i8 = d_i8.flush(None).unwrap();
4655 let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4656 assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4657
4658 let mut d_i16 = Decoder::try_new(&avro_from_codec(Codec::Int16)).unwrap();
4659 d_i16
4660 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32)))
4661 .unwrap();
4662 d_i16
4663 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32)))
4664 .unwrap();
4665 let err_i16_high = d_i16
4666 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32 + 1)))
4667 .unwrap_err();
4668 assert!(err_i16_high.to_string().contains("out of range for i16"));
4669 let err_i16_low = d_i16
4670 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32 - 1)))
4671 .unwrap_err();
4672 assert!(err_i16_low.to_string().contains("out of range for i16"));
4673 let arr_i16 = d_i16.flush(None).unwrap();
4674 let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4675 assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4676
4677 let mut d_u8 = Decoder::try_new(&avro_from_codec(Codec::UInt8)).unwrap();
4678 d_u8.decode(&mut AvroCursor::new(&encode_avro_int(0)))
4679 .unwrap();
4680 d_u8.decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32)))
4681 .unwrap();
4682 let err_u8_neg = d_u8
4683 .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4684 .unwrap_err();
4685 assert!(err_u8_neg.to_string().contains("out of range for u8"));
4686 let err_u8_high = d_u8
4687 .decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32 + 1)))
4688 .unwrap_err();
4689 assert!(err_u8_high.to_string().contains("out of range for u8"));
4690 let arr_u8 = d_u8.flush(None).unwrap();
4691 let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4692 assert_eq!(values_u8.values(), &[0, u8::MAX]);
4693
4694 let mut d_u16 = Decoder::try_new(&avro_from_codec(Codec::UInt16)).unwrap();
4695 d_u16
4696 .decode(&mut AvroCursor::new(&encode_avro_int(0)))
4697 .unwrap();
4698 d_u16
4699 .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32)))
4700 .unwrap();
4701 let err_u16_neg = d_u16
4702 .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4703 .unwrap_err();
4704 assert!(err_u16_neg.to_string().contains("out of range for u16"));
4705 let err_u16_high = d_u16
4706 .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32 + 1)))
4707 .unwrap_err();
4708 assert!(err_u16_high.to_string().contains("out of range for u16"));
4709 let arr_u16 = d_u16.flush(None).unwrap();
4710 let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4711 assert_eq!(values_u16.values(), &[0, u16::MAX]);
4712
4713 let mut d_u32 = Decoder::try_new(&avro_from_codec(Codec::UInt32)).unwrap();
4714 d_u32
4715 .decode(&mut AvroCursor::new(&encode_avro_long(0)))
4716 .unwrap();
4717 d_u32
4718 .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64)))
4719 .unwrap();
4720 let err_u32_neg = d_u32
4721 .decode(&mut AvroCursor::new(&encode_avro_long(-1)))
4722 .unwrap_err();
4723 assert!(err_u32_neg.to_string().contains("out of range for u32"));
4724 let err_u32_high = d_u32
4725 .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64 + 1)))
4726 .unwrap_err();
4727 assert!(err_u32_high.to_string().contains("out of range for u32"));
4728 let arr_u32 = d_u32.flush(None).unwrap();
4729 let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4730 assert_eq!(values_u32.values(), &[0, u32::MAX]);
4731 }
4732
4733 #[test]
4734 fn test_default_append_int32_and_int64_from_int_and_long() {
4735 let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4736 d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
4737 let arr = d_i32.flush(None).unwrap();
4738 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4739 assert_eq!(a.len(), 1);
4740 assert_eq!(a.value(0), 42);
4741 let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
4742 d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
4743 d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
4744 let arr64 = d_i64.flush(None).unwrap();
4745 let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
4746 assert_eq!(a64.len(), 2);
4747 assert_eq!(a64.value(0), 5);
4748 assert_eq!(a64.value(1), 7);
4749 }
4750
4751 #[test]
4752 fn test_default_append_floats_and_doubles() {
4753 let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
4754 d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
4755 let arr32 = d_f32.flush(None).unwrap();
4756 let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
4757 assert_eq!(a.value(0), 1.5);
4758 let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4759 d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
4760 let arr64 = d_f64.flush(None).unwrap();
4761 let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
4762 assert_eq!(b.value(0), 2.25);
4763 }
4764
4765 #[test]
4766 fn test_default_append_string_and_bytes() {
4767 let mut d_str = Decoder::String(
4768 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4769 Vec::with_capacity(DEFAULT_CAPACITY),
4770 );
4771 d_str
4772 .append_default(&AvroLiteral::String("hi".into()))
4773 .unwrap();
4774 let s_arr = d_str.flush(None).unwrap();
4775 let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
4776 assert_eq!(arr.value(0), "hi");
4777 let mut d_bytes = Decoder::Binary(
4778 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4779 Vec::with_capacity(DEFAULT_CAPACITY),
4780 );
4781 d_bytes
4782 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4783 .unwrap();
4784 let b_arr = d_bytes.flush(None).unwrap();
4785 let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
4786 assert_eq!(barr.value(0), &[1, 2, 3]);
4787 let mut d_str_err = Decoder::String(
4788 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4789 Vec::with_capacity(DEFAULT_CAPACITY),
4790 );
4791 let err = d_str_err
4792 .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
4793 .unwrap_err();
4794 assert!(
4795 err.to_string()
4796 .contains("Default for string must be string"),
4797 "unexpected error: {err:?}"
4798 );
4799 }
4800
4801 #[test]
4802 fn test_default_append_nullable_int32_null_and_value() {
4803 let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4804 let mut dec = Decoder::Nullable(
4805 NullablePlan::ReadTag {
4806 nullability: Nullability::NullFirst,
4807 resolution: ResolutionPlan::Promotion(Promotion::Direct),
4808 },
4809 NullBufferBuilder::new(DEFAULT_CAPACITY),
4810 Box::new(inner),
4811 );
4812 dec.append_default(&AvroLiteral::Null).unwrap();
4813 dec.append_default(&AvroLiteral::Int(11)).unwrap();
4814 let arr = dec.flush(None).unwrap();
4815 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4816 assert_eq!(a.len(), 2);
4817 assert!(a.is_null(0));
4818 assert_eq!(a.value(1), 11);
4819 }
4820
4821 #[test]
4822 fn test_default_append_array_of_ints() {
4823 let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4824 let mut d = Decoder::try_new(&list_dt).unwrap();
4825 let items = vec![
4826 AvroLiteral::Int(1),
4827 AvroLiteral::Int(2),
4828 AvroLiteral::Int(3),
4829 ];
4830 d.append_default(&AvroLiteral::Array(items)).unwrap();
4831 let arr = d.flush(None).unwrap();
4832 let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4833 assert_eq!(list.len(), 1);
4834 assert_eq!(list.value_length(0), 3);
4835 let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4836 assert_eq!(vals.values(), &[1, 2, 3]);
4837 }
4838
4839 #[test]
4840 fn test_default_append_map_string_to_int() {
4841 let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4842 let mut d = Decoder::try_new(&map_dt).unwrap();
4843 let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4844 m.insert("k1".to_string(), AvroLiteral::Int(10));
4845 m.insert("k2".to_string(), AvroLiteral::Int(20));
4846 d.append_default(&AvroLiteral::Map(m)).unwrap();
4847 let arr = d.flush(None).unwrap();
4848 let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4849 assert_eq!(map.len(), 1);
4850 assert_eq!(map.value_length(0), 2);
4851 let binding = map.value(0);
4852 let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4853 let k = entries
4854 .column_by_name("key")
4855 .unwrap()
4856 .as_any()
4857 .downcast_ref::<StringArray>()
4858 .unwrap();
4859 let v = entries
4860 .column_by_name("value")
4861 .unwrap()
4862 .as_any()
4863 .downcast_ref::<Int32Array>()
4864 .unwrap();
4865 let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4866 assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4867 let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4868 assert_eq!(vals, [10, 20].into_iter().collect());
4869 }
4870
4871 #[test]
4872 fn test_default_append_enum_by_symbol() {
4873 let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4874 let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4875 d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4876 let arr = d.flush(None).unwrap();
4877 let dict = arr
4878 .as_any()
4879 .downcast_ref::<DictionaryArray<Int32Type>>()
4880 .unwrap();
4881 assert_eq!(dict.len(), 1);
4882 let expected = Int32Array::from(vec![1]);
4883 assert_eq!(dict.keys(), &expected);
4884 let values = dict
4885 .values()
4886 .as_any()
4887 .downcast_ref::<StringArray>()
4888 .unwrap();
4889 assert_eq!(values.value(1), "B");
4890 }
4891
4892 #[test]
4893 fn test_default_append_uuid_and_type_error() {
4894 let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4895 let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4896 d.append_default(&AvroLiteral::String(uuid_str.into()))
4897 .unwrap();
4898 let arr_ref = d.flush(None).unwrap();
4899 let arr = arr_ref
4900 .as_any()
4901 .downcast_ref::<FixedSizeBinaryArray>()
4902 .unwrap();
4903 assert_eq!(arr.value_length(), 16);
4904 assert_eq!(arr.len(), 1);
4905 let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4906 let err = d2
4907 .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4908 .unwrap_err();
4909 assert!(
4910 err.to_string().contains("Default for uuid must be string"),
4911 "unexpected error: {err:?}"
4912 );
4913 }
4914
4915 #[test]
4916 fn test_default_append_fixed_and_length_mismatch() {
4917 let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4918 d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4919 .unwrap();
4920 let arr_ref = d.flush(None).unwrap();
4921 let arr = arr_ref
4922 .as_any()
4923 .downcast_ref::<FixedSizeBinaryArray>()
4924 .unwrap();
4925 assert_eq!(arr.value_length(), 4);
4926 assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4927 let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4928 let err = d_err
4929 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4930 .unwrap_err();
4931 assert!(
4932 err.to_string().contains("Fixed default length"),
4933 "unexpected error: {err:?}"
4934 );
4935 }
4936
4937 #[test]
4938 fn test_default_append_duration_and_length_validation() {
4939 let dt = avro_from_codec(Codec::Interval);
4940 let mut d = Decoder::try_new(&dt).unwrap();
4941 let mut bytes = Vec::with_capacity(12);
4942 bytes.extend_from_slice(&1u32.to_le_bytes());
4943 bytes.extend_from_slice(&2u32.to_le_bytes());
4944 bytes.extend_from_slice(&3u32.to_le_bytes());
4945 d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4946 let arr_ref = d.flush(None).unwrap();
4947 let arr = arr_ref
4948 .as_any()
4949 .downcast_ref::<IntervalMonthDayNanoArray>()
4950 .unwrap();
4951 assert_eq!(arr.len(), 1);
4952 let v = arr.value(0);
4953 assert_eq!(v.months, 1);
4954 assert_eq!(v.days, 2);
4955 assert_eq!(v.nanoseconds, 3_000_000);
4956 let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4957 let err = d_err
4958 .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4959 .unwrap_err();
4960 assert!(
4961 err.to_string()
4962 .contains("Duration default must be exactly 12 bytes"),
4963 "unexpected error: {err:?}"
4964 );
4965 }
4966
4967 #[test]
4968 fn test_default_append_decimal256_from_bytes() {
4969 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4970 let mut d = Decoder::try_new(&dt).unwrap();
4971 let pos: [u8; 32] = [
4972 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4973 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4974 0x00, 0x00, 0x30, 0x39,
4975 ];
4976 d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4977 let neg: [u8; 32] = [
4978 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4979 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4980 0xFF, 0xFF, 0xFF, 0x85,
4981 ];
4982 d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4983 let arr = d.flush(None).unwrap();
4984 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4985 assert_eq!(dec.len(), 2);
4986 assert_eq!(dec.value_as_string(0), "123.45");
4987 assert_eq!(dec.value_as_string(1), "-1.23");
4988 }
4989
4990 #[test]
4991 fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4992 let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4993 let mut rec = make_record_decoder_with_projector_defaults(
4994 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4995 field_defaults,
4996 vec![],
4997 );
4998 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4999 map.insert("a".to_string(), AvroLiteral::Int(7));
5000 rec.append_default(&AvroLiteral::Map(map)).unwrap();
5001 let arr = rec.flush(None).unwrap();
5002 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5003 let a = s
5004 .column_by_name("a")
5005 .unwrap()
5006 .as_any()
5007 .downcast_ref::<Int32Array>()
5008 .unwrap();
5009 let b = s
5010 .column_by_name("b")
5011 .unwrap()
5012 .as_any()
5013 .downcast_ref::<StringArray>()
5014 .unwrap();
5015 assert_eq!(a.value(0), 7);
5016 assert_eq!(b.value(0), "hi");
5017 }
5018
5019 #[test]
5020 fn test_record_append_default_null_uses_projector_field_defaults() {
5021 let field_defaults = vec![
5022 Some(AvroLiteral::Int(5)),
5023 Some(AvroLiteral::String("x".into())),
5024 ];
5025 let mut rec = make_record_decoder_with_projector_defaults(
5026 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
5027 field_defaults,
5028 vec![],
5029 );
5030 rec.append_default(&AvroLiteral::Null).unwrap();
5031 let arr = rec.flush(None).unwrap();
5032 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5033 let a = s
5034 .column_by_name("a")
5035 .unwrap()
5036 .as_any()
5037 .downcast_ref::<Int32Array>()
5038 .unwrap();
5039 let b = s
5040 .column_by_name("b")
5041 .unwrap()
5042 .as_any()
5043 .downcast_ref::<StringArray>()
5044 .unwrap();
5045 assert_eq!(a.value(0), 5);
5046 assert_eq!(b.value(0), "x");
5047 }
5048
5049 #[test]
5050 fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
5051 {
5052 let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
5053 let mut field_refs: Vec<FieldRef> = Vec::new();
5054 let mut encoders: Vec<Decoder> = Vec::new();
5055 for (name, dt, nullable) in &fields {
5056 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
5057 }
5058 let enc_a = Decoder::Nullable(
5059 NullablePlan::ReadTag {
5060 nullability: Nullability::NullSecond,
5061 resolution: ResolutionPlan::Promotion(Promotion::Direct),
5062 },
5063 NullBufferBuilder::new(DEFAULT_CAPACITY),
5064 Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
5065 );
5066 let enc_b = Decoder::Nullable(
5067 NullablePlan::ReadTag {
5068 nullability: Nullability::NullSecond,
5069 resolution: ResolutionPlan::Promotion(Promotion::Direct),
5070 },
5071 NullBufferBuilder::new(DEFAULT_CAPACITY),
5072 Box::new(Decoder::String(
5073 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
5074 Vec::with_capacity(DEFAULT_CAPACITY),
5075 )),
5076 );
5077 encoders.push(enc_a);
5078 encoders.push(enc_b);
5079 let field_defaults = vec![None, None]; let projector = Projector {
5081 writer_projections: vec![],
5082 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
5083 };
5084 let mut rec = Decoder::Record(field_refs.into(), encoders, field_defaults, Some(projector));
5085 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
5086 map.insert("a".to_string(), AvroLiteral::Int(9));
5087 rec.append_default(&AvroLiteral::Map(map)).unwrap();
5088 let arr = rec.flush(None).unwrap();
5089 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5090 let a = s
5091 .column_by_name("a")
5092 .unwrap()
5093 .as_any()
5094 .downcast_ref::<Int32Array>()
5095 .unwrap();
5096 let b = s
5097 .column_by_name("b")
5098 .unwrap()
5099 .as_any()
5100 .downcast_ref::<StringArray>()
5101 .unwrap();
5102 assert!(a.is_valid(0));
5103 assert_eq!(a.value(0), 9);
5104 assert!(b.is_null(0));
5105 }
5106
5107 #[test]
5108 fn test_projector_default_injection_when_writer_lacks_fields() {
5109 let defaults = vec![None, None];
5110 let injections = vec![
5111 (0, AvroLiteral::Int(99)),
5112 (1, AvroLiteral::String("alice".into())),
5113 ];
5114 let mut rec = make_record_decoder_with_projector_defaults(
5115 &[
5116 ("id", DataType::Int32, false),
5117 ("name", DataType::Utf8, false),
5118 ],
5119 defaults,
5120 injections,
5121 );
5122 rec.decode(&mut AvroCursor::new(&[])).unwrap();
5123 let arr = rec.flush(None).unwrap();
5124 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
5125 let id = s
5126 .column_by_name("id")
5127 .unwrap()
5128 .as_any()
5129 .downcast_ref::<Int32Array>()
5130 .unwrap();
5131 let name = s
5132 .column_by_name("name")
5133 .unwrap()
5134 .as_any()
5135 .downcast_ref::<StringArray>()
5136 .unwrap();
5137 assert_eq!(id.value(0), 99);
5138 assert_eq!(name.value(0), "alice");
5139 }
5140
5141 #[test]
5142 fn union_type_ids_are_not_child_indexes() {
5143 let encodings: Vec<AvroDataType> =
5144 vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
5145 let fields: UnionFields = [
5146 (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
5147 (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
5148 ]
5149 .into_iter()
5150 .collect();
5151 let dt = avro_from_codec(Codec::Union(
5152 encodings.into(),
5153 fields.clone(),
5154 UnionMode::Dense,
5155 ));
5156 let mut dec = Decoder::try_new(&dt).expect("decoder");
5157 let mut b1 = encode_avro_long(1);
5158 b1.extend(encode_avro_bytes("hi".as_bytes()));
5159 dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
5160 let mut b0 = encode_avro_long(0);
5161 b0.extend(encode_avro_int(5));
5162 dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
5163 let arr = dec.flush(None).expect("flush");
5164 let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
5165 assert_eq!(ua.len(), 2);
5166 assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
5167 assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
5168 assert_eq!(ua.value_offset(0), 0);
5169 assert_eq!(ua.value_offset(1), 0);
5170 let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
5171 assert_eq!(utf8_child.len(), 1);
5172 assert_eq!(utf8_child.value(0), "hi");
5173 let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
5174 assert_eq!(int_child.len(), 1);
5175 assert_eq!(int_child.value(0), 5);
5176 let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
5177 assert_eq!(type_ids, vec![42_i8, 7_i8]);
5178 }
5179
5180 #[cfg(feature = "avro_custom_types")]
5181 #[test]
5182 fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
5183 for codec in [
5184 Codec::DurationNanos,
5185 Codec::DurationMicros,
5186 Codec::DurationMillis,
5187 Codec::DurationSeconds,
5188 ] {
5189 let dt = make_avro_dt(codec.clone(), None);
5190 let s = Skipper::from_avro(&dt)?;
5191 match s {
5192 Skipper::Int64 => {}
5193 other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
5194 }
5195 }
5196 Ok(())
5197 }
5198
5199 #[cfg(feature = "avro_custom_types")]
5200 #[test]
5201 fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
5202 let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
5203 for codec in [
5204 Codec::DurationNanos,
5205 Codec::DurationMicros,
5206 Codec::DurationMillis,
5207 Codec::DurationSeconds,
5208 ] {
5209 let dt = make_avro_dt(codec.clone(), None);
5210 let s = Skipper::from_avro(&dt)?;
5211 for &v in &values {
5212 let bytes = encode_avro_long(v);
5213 let mut cursor = AvroCursor::new(&bytes);
5214 s.skip(&mut cursor)?;
5215 assert_eq!(
5216 cursor.position(),
5217 bytes.len(),
5218 "did not consume all bytes for {:?} value {}",
5219 codec,
5220 v
5221 );
5222 }
5223 }
5224 Ok(())
5225 }
5226
5227 #[cfg(feature = "avro_custom_types")]
5228 #[test]
5229 fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
5230 let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
5231 let s = Skipper::from_avro(&dt)?;
5232 match &s {
5233 Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
5234 Skipper::Int64 => {}
5235 ref other => panic!("expected inner Int64, got {:?}", other),
5236 },
5237 other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
5238 }
5239 {
5240 let buf = encode_vlq_u64(0);
5241 let mut cursor = AvroCursor::new(&buf);
5242 s.skip(&mut cursor)?;
5243 assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
5244 }
5245 {
5246 let mut buf = encode_vlq_u64(1);
5247 buf.extend(encode_avro_long(0));
5248 let mut cursor = AvroCursor::new(&buf);
5249 s.skip(&mut cursor)?;
5250 assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
5251 }
5252
5253 Ok(())
5254 }
5255
5256 #[cfg(feature = "avro_custom_types")]
5257 #[test]
5258 fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
5259 let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
5260 let s = Skipper::from_avro(&dt)?;
5261 match &s {
5262 Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
5263 Skipper::Int64 => {}
5264 ref other => panic!("expected inner Int64, got {:?}", other),
5265 },
5266 other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
5267 }
5268 {
5269 let buf = encode_vlq_u64(1);
5270 let mut cursor = AvroCursor::new(&buf);
5271 s.skip(&mut cursor)?;
5272 assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
5273 }
5274 {
5275 let mut buf = encode_vlq_u64(0);
5276 buf.extend(encode_avro_long(-1));
5277 let mut cursor = AvroCursor::new(&buf);
5278 s.skip(&mut cursor)?;
5279 assert_eq!(
5280 cursor.position(),
5281 1 + encode_avro_long(-1).len(),
5282 "expected to consume tag=0 + long(-1)"
5283 );
5284 }
5285 Ok(())
5286 }
5287
5288 #[test]
5289 fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
5290 let dt = make_avro_dt(Codec::Interval, None);
5291 let s = Skipper::from_avro(&dt)?;
5292 match s {
5293 Skipper::DurationFixed12 => {}
5294 other => panic!("expected DurationFixed12, got {:?}", other),
5295 }
5296 let payload = vec![0u8; 12];
5297 let mut cursor = AvroCursor::new(&payload);
5298 s.skip(&mut cursor)?;
5299 assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
5300 Ok(())
5301 }
5302
5303 #[cfg(feature = "avro_custom_types")]
5304 #[test]
5305 fn test_run_end_encoded_width16_int32_basic_grouping() {
5306 use arrow_array::RunArray;
5307 use std::sync::Arc;
5308 let inner = avro_from_codec(Codec::Int32);
5309 let ree = AvroDataType::new(
5310 Codec::RunEndEncoded(Arc::new(inner), 16),
5311 Default::default(),
5312 None,
5313 );
5314 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5315 for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
5316 let bytes = encode_avro_int(v);
5317 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5318 }
5319 let arr = dec.flush(None).expect("flush");
5320 let ra = arr
5321 .as_any()
5322 .downcast_ref::<RunArray<Int16Type>>()
5323 .expect("RunArray<Int16Type>");
5324 assert_eq!(ra.len(), 9);
5325 assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
5326 let vals = ra
5327 .values()
5328 .as_ref()
5329 .as_any()
5330 .downcast_ref::<Int32Array>()
5331 .expect("values Int32");
5332 assert_eq!(vals.values(), &[1, 2, 3]);
5333 }
5334
5335 #[cfg(feature = "avro_custom_types")]
5336 #[test]
5337 fn test_run_end_encoded_width32_nullable_values_group_nulls() {
5338 use arrow_array::RunArray;
5339 use std::sync::Arc;
5340 let inner = AvroDataType::new(
5341 Codec::Int32,
5342 Default::default(),
5343 Some(Nullability::NullSecond),
5344 );
5345 let ree = AvroDataType::new(
5346 Codec::RunEndEncoded(Arc::new(inner), 32),
5347 Default::default(),
5348 None,
5349 );
5350 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5351 let seq: [Option<i32>; 8] = [
5352 None,
5353 None,
5354 Some(7),
5355 Some(7),
5356 Some(7),
5357 None,
5358 Some(5),
5359 Some(5),
5360 ];
5361 for item in seq {
5362 let mut bytes = Vec::new();
5363 match item {
5364 None => bytes.extend_from_slice(&encode_vlq_u64(1)),
5365 Some(v) => {
5366 bytes.extend_from_slice(&encode_vlq_u64(0));
5367 bytes.extend_from_slice(&encode_avro_int(v));
5368 }
5369 }
5370 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5371 }
5372 let arr = dec.flush(None).expect("flush");
5373 let ra = arr
5374 .as_any()
5375 .downcast_ref::<RunArray<Int32Type>>()
5376 .expect("RunArray<Int32Type>");
5377 assert_eq!(ra.len(), 8);
5378 assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
5379 let vals = ra
5380 .values()
5381 .as_ref()
5382 .as_any()
5383 .downcast_ref::<Int32Array>()
5384 .expect("values Int32 (nullable)");
5385 assert_eq!(vals.len(), 4);
5386 assert!(vals.is_null(0));
5387 assert_eq!(vals.value(1), 7);
5388 assert!(vals.is_null(2));
5389 assert_eq!(vals.value(3), 5);
5390 }
5391
5392 #[cfg(feature = "avro_custom_types")]
5393 #[test]
5394 fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
5395 use arrow_array::RunArray;
5396 let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
5397 let ree = Decoder::RunEndEncoded(
5398 8, 0,
5400 Box::new(inner_values),
5401 );
5402 let mut dec = Decoder::Nullable(
5403 NullablePlan::FromSingle {
5404 resolution: ResolutionPlan::Promotion(Promotion::IntToDouble),
5405 },
5406 NullBufferBuilder::new(DEFAULT_CAPACITY),
5407 Box::new(ree),
5408 );
5409 for v in [1, 1, 2, 2, 2] {
5410 let bytes = encode_avro_int(v);
5411 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5412 }
5413 let arr = dec.flush(None).expect("flush");
5414 let ra = arr
5415 .as_any()
5416 .downcast_ref::<RunArray<Int64Type>>()
5417 .expect("RunArray<Int64Type>");
5418 assert_eq!(ra.len(), 5);
5419 assert_eq!(ra.run_ends().values(), &[2, 5]);
5420 let vals = ra
5421 .values()
5422 .as_ref()
5423 .as_any()
5424 .downcast_ref::<Float64Array>()
5425 .expect("values Float64");
5426 assert_eq!(vals.values(), &[1.0, 2.0]);
5427 }
5428
5429 #[cfg(feature = "avro_custom_types")]
5430 #[test]
5431 fn test_run_end_encoded_unsupported_run_end_width_errors() {
5432 use std::sync::Arc;
5433 let inner = avro_from_codec(Codec::Int32);
5434 let dt = AvroDataType::new(
5435 Codec::RunEndEncoded(Arc::new(inner), 3),
5436 Default::default(),
5437 None,
5438 );
5439 let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
5440 let msg = err.to_string();
5441 assert!(
5442 msg.contains("Unsupported run-end width")
5443 && msg.contains("16/32/64 bits or 2/4/8 bytes"),
5444 "unexpected error message: {msg}"
5445 );
5446 }
5447
5448 #[cfg(feature = "avro_custom_types")]
5449 #[test]
5450 fn test_run_end_encoded_empty_input_is_empty_runarray() {
5451 use arrow_array::RunArray;
5452 use std::sync::Arc;
5453 let inner = avro_from_codec(Codec::Utf8);
5454 let dt = AvroDataType::new(
5455 Codec::RunEndEncoded(Arc::new(inner), 4),
5456 Default::default(),
5457 None,
5458 );
5459 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5460 let arr = dec.flush(None).expect("flush");
5461 let ra = arr
5462 .as_any()
5463 .downcast_ref::<RunArray<Int32Type>>()
5464 .expect("RunArray<Int32Type>");
5465 assert_eq!(ra.len(), 0);
5466 assert_eq!(ra.run_ends().len(), 0);
5467 assert_eq!(ra.values().len(), 0);
5468 }
5469
5470 #[cfg(feature = "avro_custom_types")]
5471 #[test]
5472 fn test_run_end_encoded_strings_grouping_width32_bits() {
5473 use arrow_array::RunArray;
5474 use std::sync::Arc;
5475 let inner = avro_from_codec(Codec::Utf8);
5476 let dt = AvroDataType::new(
5477 Codec::RunEndEncoded(Arc::new(inner), 32),
5478 Default::default(),
5479 None,
5480 );
5481 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5482 for s in ["a", "a", "bb", "bb", "bb", "a"] {
5483 let bytes = encode_avro_bytes(s.as_bytes());
5484 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5485 }
5486 let arr = dec.flush(None).expect("flush");
5487 let ra = arr
5488 .as_any()
5489 .downcast_ref::<RunArray<Int32Type>>()
5490 .expect("RunArray<Int32Type>");
5491 assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
5492 let vals = ra
5493 .values()
5494 .as_ref()
5495 .as_any()
5496 .downcast_ref::<StringArray>()
5497 .expect("values String");
5498 assert_eq!(vals.len(), 3);
5499 assert_eq!(vals.value(0), "a");
5500 assert_eq!(vals.value(1), "bb");
5501 assert_eq!(vals.value(2), "a");
5502 }
5503
5504 #[cfg(not(feature = "avro_custom_types"))]
5505 #[test]
5506 fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
5507 let dt = avro_from_codec(Codec::Int32);
5508 let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
5509 for v in [1, 2, 3] {
5510 let bytes = encode_avro_int(v);
5511 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5512 }
5513 let arr = dec.flush(None).expect("flush");
5514 let a = arr
5515 .as_any()
5516 .downcast_ref::<Int32Array>()
5517 .expect("Int32Array");
5518 assert_eq!(a.values(), &[1, 2, 3]);
5519 }
5520
5521 #[test]
5522 fn test_timestamp_nanos_decoding_offset_zero() {
5523 let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::OffsetZero)));
5524 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5525 let mut data = Vec::new();
5526 for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5527 data.extend_from_slice(&encode_avro_long(v));
5528 }
5529 let mut cur = AvroCursor::new(&data);
5530 for _ in 0..4 {
5531 decoder.decode(&mut cur).expect("decode nanos ts");
5532 }
5533 let array = decoder.flush(None).expect("flush nanos ts");
5534 let ts = array
5535 .as_any()
5536 .downcast_ref::<TimestampNanosecondArray>()
5537 .expect("TimestampNanosecondArray");
5538 assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5539 match ts.data_type() {
5540 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5541 assert_eq!(tz.as_deref(), Some("+00:00"));
5542 }
5543 other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
5544 }
5545 }
5546
5547 #[test]
5548 fn test_timestamp_nanos_decoding_utc() {
5549 let avro_type = avro_from_codec(Codec::TimestampNanos(Some(Tz::Utc)));
5550 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5551 let mut data = Vec::new();
5552 for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5553 data.extend_from_slice(&encode_avro_long(v));
5554 }
5555 let mut cur = AvroCursor::new(&data);
5556 for _ in 0..4 {
5557 decoder.decode(&mut cur).expect("decode nanos ts");
5558 }
5559 let array = decoder.flush(None).expect("flush nanos ts");
5560 let ts = array
5561 .as_any()
5562 .downcast_ref::<TimestampNanosecondArray>()
5563 .expect("TimestampNanosecondArray");
5564 assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5565 match ts.data_type() {
5566 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5567 assert_eq!(tz.as_deref(), Some("UTC"));
5568 }
5569 other => panic!("expected Timestamp(Nanosecond, Some(\"UTC\")), got {other:?}"),
5570 }
5571 }
5572
5573 #[test]
5574 fn test_timestamp_nanos_decoding_local() {
5575 let avro_type = avro_from_codec(Codec::TimestampNanos(None));
5576 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5577 let mut data = Vec::new();
5578 for v in [10_i64, 20_i64, -30_i64] {
5579 data.extend_from_slice(&encode_avro_long(v));
5580 }
5581 let mut cur = AvroCursor::new(&data);
5582 for _ in 0..3 {
5583 decoder.decode(&mut cur).expect("decode nanos ts");
5584 }
5585 let array = decoder.flush(None).expect("flush nanos ts");
5586 let ts = array
5587 .as_any()
5588 .downcast_ref::<TimestampNanosecondArray>()
5589 .expect("TimestampNanosecondArray");
5590 assert_eq!(ts.values(), &[10, 20, -30]);
5591 match ts.data_type() {
5592 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5593 assert_eq!(tz.as_deref(), None);
5594 }
5595 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5596 }
5597 }
5598
5599 #[test]
5600 fn test_timestamp_nanos_decoding_with_nulls() {
5601 let avro_type = AvroDataType::new(
5602 Codec::TimestampNanos(None),
5603 Default::default(),
5604 Some(Nullability::NullFirst),
5605 );
5606 let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
5607 let mut data = Vec::new();
5608 data.extend_from_slice(&encode_avro_long(1));
5609 data.extend_from_slice(&encode_avro_long(42));
5610 data.extend_from_slice(&encode_avro_long(0));
5611 data.extend_from_slice(&encode_avro_long(1));
5612 data.extend_from_slice(&encode_avro_long(-7));
5613 let mut cur = AvroCursor::new(&data);
5614 for _ in 0..3 {
5615 decoder.decode(&mut cur).expect("decode nullable nanos ts");
5616 }
5617 let array = decoder.flush(None).expect("flush nullable nanos ts");
5618 let ts = array
5619 .as_any()
5620 .downcast_ref::<TimestampNanosecondArray>()
5621 .expect("TimestampNanosecondArray");
5622 assert_eq!(ts.len(), 3);
5623 assert!(ts.is_valid(0));
5624 assert!(ts.is_null(1));
5625 assert!(ts.is_valid(2));
5626 assert_eq!(ts.value(0), 42);
5627 assert_eq!(ts.value(2), -7);
5628 match ts.data_type() {
5629 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5630 assert_eq!(tz.as_deref(), None);
5631 }
5632 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5633 }
5634 }
5635}