1use crate::codec::{
21 AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
22 ResolvedUnion,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36 DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37 Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use std::cmp::Ordering;
42use std::sync::Arc;
43use strum_macros::AsRefStr;
44use uuid::Uuid;
45
46const DEFAULT_CAPACITY: usize = 1024;
47
48#[derive(Clone, Copy, Debug)]
50enum NullablePlan {
51 ReadTag,
53 FromSingle { promotion: Promotion },
56}
57
58macro_rules! decode_decimal {
60 ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
61 let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
62 $builder.append_value(<$Int>::from_be_bytes(bytes));
63 }};
64}
65
66macro_rules! flush_decimal {
68 ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
69 let (_, vals, _) = $builder.finish().into_parts();
70 let dec = <$ArrayTy>::try_new(vals, $nulls)?
71 .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
72 Arc::new(dec) as ArrayRef
73 }};
74}
75
76macro_rules! append_decimal_default {
79 ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
80 match $lit {
81 AvroLiteral::Bytes(b) => {
82 let ext = sign_cast_to::<$N>(b)?;
83 let val = <$Int>::from_be_bytes(ext);
84 $builder.append_value(val);
85 Ok(())
86 }
87 _ => Err(AvroError::InvalidArgument(
88 concat!(
89 "Default for ",
90 $name,
91 " must be bytes (two's-complement big-endian)"
92 )
93 .to_string(),
94 )),
95 }
96 }};
97}
98
99#[derive(Debug)]
101pub(crate) struct RecordDecoder {
102 schema: SchemaRef,
103 fields: Vec<Decoder>,
104 projector: Option<Projector>,
105}
106
107impl RecordDecoder {
108 pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
119 match data_type.codec() {
120 Codec::Struct(reader_fields) => {
121 let mut arrow_fields = Vec::with_capacity(reader_fields.len());
123 let mut encodings = Vec::with_capacity(reader_fields.len());
124 for avro_field in reader_fields.iter() {
125 arrow_fields.push(avro_field.field());
126 encodings.push(Decoder::try_new(avro_field.data_type())?);
127 }
128 let projector = match data_type.resolution.as_ref() {
129 Some(ResolutionInfo::Record(rec)) => {
130 Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
131 }
132 _ => None,
133 };
134 Ok(Self {
135 schema: Arc::new(ArrowSchema::new(arrow_fields)),
136 fields: encodings,
137 projector,
138 })
139 }
140 other => Err(AvroError::ParseError(format!(
141 "Expected record got {other:?}"
142 ))),
143 }
144 }
145
146 pub(crate) fn schema(&self) -> &SchemaRef {
148 &self.schema
149 }
150
151 pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
153 let mut cursor = AvroCursor::new(buf);
154 match self.projector.as_mut() {
155 Some(proj) => {
156 for _ in 0..count {
157 proj.project_record(&mut cursor, &mut self.fields)?;
158 }
159 }
160 None => {
161 for _ in 0..count {
162 for field in &mut self.fields {
163 field.decode(&mut cursor)?;
164 }
165 }
166 }
167 }
168 Ok(cursor.position())
169 }
170
171 pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
173 let arrays = self
174 .fields
175 .iter_mut()
176 .map(|x| x.flush(None))
177 .collect::<Result<Vec<_>, _>>()?;
178 RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
179 }
180}
181
182#[derive(Debug)]
183struct EnumResolution {
184 mapping: Arc<[i32]>,
185 default_index: i32,
186}
187
188#[derive(Debug, AsRefStr)]
189enum Decoder {
190 Null(usize),
191 Boolean(BooleanBufferBuilder),
192 Int32(Vec<i32>),
193 Int64(Vec<i64>),
194 #[cfg(feature = "avro_custom_types")]
195 DurationSecond(Vec<i64>),
196 #[cfg(feature = "avro_custom_types")]
197 DurationMillisecond(Vec<i64>),
198 #[cfg(feature = "avro_custom_types")]
199 DurationMicrosecond(Vec<i64>),
200 #[cfg(feature = "avro_custom_types")]
201 DurationNanosecond(Vec<i64>),
202 #[cfg(feature = "avro_custom_types")]
203 Int8(Vec<i8>),
204 #[cfg(feature = "avro_custom_types")]
205 Int16(Vec<i16>),
206 #[cfg(feature = "avro_custom_types")]
207 UInt8(Vec<u8>),
208 #[cfg(feature = "avro_custom_types")]
209 UInt16(Vec<u16>),
210 #[cfg(feature = "avro_custom_types")]
211 UInt32(Vec<u32>),
212 #[cfg(feature = "avro_custom_types")]
213 UInt64(Vec<u64>),
214 #[cfg(feature = "avro_custom_types")]
215 Float16(Vec<u16>), #[cfg(feature = "avro_custom_types")]
217 Date64(Vec<i64>),
218 #[cfg(feature = "avro_custom_types")]
219 TimeNanos(Vec<i64>),
220 #[cfg(feature = "avro_custom_types")]
221 Time32Secs(Vec<i32>),
222 #[cfg(feature = "avro_custom_types")]
223 TimestampSecs(bool, Vec<i64>),
224 #[cfg(feature = "avro_custom_types")]
225 IntervalYearMonth(Vec<i32>),
226 #[cfg(feature = "avro_custom_types")]
227 IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
228 #[cfg(feature = "avro_custom_types")]
229 IntervalDayTime(Vec<IntervalDayTime>),
230 Float32(Vec<f32>),
231 Float64(Vec<f64>),
232 Date32(Vec<i32>),
233 TimeMillis(Vec<i32>),
234 TimeMicros(Vec<i64>),
235 TimestampMillis(bool, Vec<i64>),
236 TimestampMicros(bool, Vec<i64>),
237 TimestampNanos(bool, Vec<i64>),
238 Int32ToInt64(Vec<i64>),
239 Int32ToFloat32(Vec<f32>),
240 Int32ToFloat64(Vec<f64>),
241 Int64ToFloat32(Vec<f32>),
242 Int64ToFloat64(Vec<f64>),
243 Float32ToFloat64(Vec<f64>),
244 BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
245 StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
246 Binary(OffsetBufferBuilder<i32>, Vec<u8>),
247 String(OffsetBufferBuilder<i32>, Vec<u8>),
249 StringView(OffsetBufferBuilder<i32>, Vec<u8>),
251 Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
252 Record(Fields, Vec<Decoder>, Option<Projector>),
253 Map(
254 FieldRef,
255 OffsetBufferBuilder<i32>,
256 OffsetBufferBuilder<i32>,
257 Vec<u8>,
258 Box<Decoder>,
259 ),
260 Fixed(i32, Vec<u8>),
261 Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
262 Duration(IntervalMonthDayNanoBuilder),
263 Uuid(Vec<u8>),
264 #[cfg(feature = "small_decimals")]
265 Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
266 #[cfg(feature = "small_decimals")]
267 Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
268 Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
269 Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
270 #[cfg(feature = "avro_custom_types")]
271 RunEndEncoded(u8, usize, Box<Decoder>),
272 Union(UnionDecoder),
273 Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
274}
275
276impl Decoder {
277 fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
278 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
279 if info.writer_is_union && !info.reader_is_union {
280 let mut clone = data_type.clone();
281 clone.resolution = None; let target = Box::new(Self::try_new_internal(&clone)?);
283 let decoder = Self::Union(
284 UnionDecoderBuilder::new()
285 .with_resolved_union(info.clone())
286 .with_target(target)
287 .build()?,
288 );
289 return Ok(decoder);
290 }
291 }
292 Self::try_new_internal(data_type)
293 }
294
295 fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
296 let promotion = match data_type.resolution.as_ref() {
298 Some(ResolutionInfo::Promotion(p)) => Some(p),
299 _ => None,
300 };
301 let decoder = match (data_type.codec(), promotion) {
302 (Codec::Int64, Some(Promotion::IntToLong)) => {
303 Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
304 }
305 (Codec::Float32, Some(Promotion::IntToFloat)) => {
306 Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
307 }
308 (Codec::Float64, Some(Promotion::IntToDouble)) => {
309 Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
310 }
311 (Codec::Float32, Some(Promotion::LongToFloat)) => {
312 Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
313 }
314 (Codec::Float64, Some(Promotion::LongToDouble)) => {
315 Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
316 }
317 (Codec::Float64, Some(Promotion::FloatToDouble)) => {
318 Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
319 }
320 (Codec::Utf8, Some(Promotion::BytesToString))
321 | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
322 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
323 Vec::with_capacity(DEFAULT_CAPACITY),
324 ),
325 (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
326 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
327 Vec::with_capacity(DEFAULT_CAPACITY),
328 ),
329 (Codec::Null, _) => Self::Null(0),
330 (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
331 (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
332 (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
333 (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
334 (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
335 (Codec::Binary, _) => Self::Binary(
336 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
337 Vec::with_capacity(DEFAULT_CAPACITY),
338 ),
339 (Codec::Utf8, _) => Self::String(
340 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
341 Vec::with_capacity(DEFAULT_CAPACITY),
342 ),
343 (Codec::Utf8View, _) => Self::StringView(
344 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
345 Vec::with_capacity(DEFAULT_CAPACITY),
346 ),
347 (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
348 (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
349 (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
350 (Codec::TimestampMillis(is_utc), _) => {
351 Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
352 }
353 (Codec::TimestampMicros(is_utc), _) => {
354 Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
355 }
356 (Codec::TimestampNanos(is_utc), _) => {
357 Self::TimestampNanos(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
358 }
359 #[cfg(feature = "avro_custom_types")]
360 (Codec::DurationNanos, _) => {
361 Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
362 }
363 #[cfg(feature = "avro_custom_types")]
364 (Codec::DurationMicros, _) => {
365 Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
366 }
367 #[cfg(feature = "avro_custom_types")]
368 (Codec::DurationMillis, _) => {
369 Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
370 }
371 #[cfg(feature = "avro_custom_types")]
372 (Codec::DurationSeconds, _) => {
373 Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
374 }
375 #[cfg(feature = "avro_custom_types")]
376 (Codec::Int8, _) => Self::Int8(Vec::with_capacity(DEFAULT_CAPACITY)),
377 #[cfg(feature = "avro_custom_types")]
378 (Codec::Int16, _) => Self::Int16(Vec::with_capacity(DEFAULT_CAPACITY)),
379 #[cfg(feature = "avro_custom_types")]
380 (Codec::UInt8, _) => Self::UInt8(Vec::with_capacity(DEFAULT_CAPACITY)),
381 #[cfg(feature = "avro_custom_types")]
382 (Codec::UInt16, _) => Self::UInt16(Vec::with_capacity(DEFAULT_CAPACITY)),
383 #[cfg(feature = "avro_custom_types")]
384 (Codec::UInt32, _) => Self::UInt32(Vec::with_capacity(DEFAULT_CAPACITY)),
385 #[cfg(feature = "avro_custom_types")]
386 (Codec::UInt64, _) => Self::UInt64(Vec::with_capacity(DEFAULT_CAPACITY)),
387 #[cfg(feature = "avro_custom_types")]
388 (Codec::Float16, _) => Self::Float16(Vec::with_capacity(DEFAULT_CAPACITY)),
389 #[cfg(feature = "avro_custom_types")]
390 (Codec::Date64, _) => Self::Date64(Vec::with_capacity(DEFAULT_CAPACITY)),
391 #[cfg(feature = "avro_custom_types")]
392 (Codec::TimeNanos, _) => Self::TimeNanos(Vec::with_capacity(DEFAULT_CAPACITY)),
393 #[cfg(feature = "avro_custom_types")]
394 (Codec::Time32Secs, _) => Self::Time32Secs(Vec::with_capacity(DEFAULT_CAPACITY)),
395 #[cfg(feature = "avro_custom_types")]
396 (Codec::TimestampSecs(is_utc), _) => {
397 Self::TimestampSecs(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
398 }
399 #[cfg(feature = "avro_custom_types")]
400 (Codec::IntervalYearMonth, _) => {
401 Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
402 }
403 #[cfg(feature = "avro_custom_types")]
404 (Codec::IntervalMonthDayNano, _) => {
405 Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
406 }
407 #[cfg(feature = "avro_custom_types")]
408 (Codec::IntervalDayTime, _) => {
409 Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
410 }
411 (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
412 (Codec::Decimal(precision, scale, size), _) => {
413 let p = *precision;
414 let s = *scale;
415 let prec = p as u8;
416 let scl = s.unwrap_or(0) as i8;
417 #[cfg(feature = "small_decimals")]
418 {
419 if p <= DECIMAL32_MAX_PRECISION as usize {
420 let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
421 .with_precision_and_scale(prec, scl)?;
422 Self::Decimal32(p, s, *size, builder)
423 } else if p <= DECIMAL64_MAX_PRECISION as usize {
424 let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
425 .with_precision_and_scale(prec, scl)?;
426 Self::Decimal64(p, s, *size, builder)
427 } else if p <= DECIMAL128_MAX_PRECISION as usize {
428 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
429 .with_precision_and_scale(prec, scl)?;
430 Self::Decimal128(p, s, *size, builder)
431 } else if p <= DECIMAL256_MAX_PRECISION as usize {
432 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
433 .with_precision_and_scale(prec, scl)?;
434 Self::Decimal256(p, s, *size, builder)
435 } else {
436 return Err(AvroError::ParseError(format!(
437 "Decimal precision {p} exceeds maximum supported"
438 )));
439 }
440 }
441 #[cfg(not(feature = "small_decimals"))]
442 {
443 if p <= DECIMAL128_MAX_PRECISION as usize {
444 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
445 .with_precision_and_scale(prec, scl)?;
446 Self::Decimal128(p, s, *size, builder)
447 } else if p <= DECIMAL256_MAX_PRECISION as usize {
448 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
449 .with_precision_and_scale(prec, scl)?;
450 Self::Decimal256(p, s, *size, builder)
451 } else {
452 return Err(AvroError::ParseError(format!(
453 "Decimal precision {p} exceeds maximum supported"
454 )));
455 }
456 }
457 }
458 (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
459 (Codec::List(item), _) => {
460 let decoder = Self::try_new(item)?;
461 Self::Array(
462 Arc::new(item.field_with_name("item")),
463 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
464 Box::new(decoder),
465 )
466 }
467 (Codec::Enum(symbols), _) => {
468 let res = match data_type.resolution.as_ref() {
469 Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
470 mapping: mapping.mapping.clone(),
471 default_index: mapping.default_index,
472 }),
473 _ => None,
474 };
475 Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
476 }
477 (Codec::Struct(fields), _) => {
478 let mut arrow_fields = Vec::with_capacity(fields.len());
479 let mut encodings = Vec::with_capacity(fields.len());
480 for avro_field in fields.iter() {
481 let encoding = Self::try_new(avro_field.data_type())?;
482 arrow_fields.push(avro_field.field());
483 encodings.push(encoding);
484 }
485 let projector =
486 if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
487 Some(ProjectorBuilder::try_new(rec, fields).build()?)
488 } else {
489 None
490 };
491 Self::Record(arrow_fields.into(), encodings, projector)
492 }
493 (Codec::Map(child), _) => {
494 let val_field = child.field_with_name("value");
495 let map_field = Arc::new(ArrowField::new(
496 "entries",
497 DataType::Struct(Fields::from(vec![
498 ArrowField::new("key", DataType::Utf8, false),
499 val_field,
500 ])),
501 false,
502 ));
503 let val_dec = Self::try_new(child)?;
504 Self::Map(
505 map_field,
506 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
507 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
508 Vec::with_capacity(DEFAULT_CAPACITY),
509 Box::new(val_dec),
510 )
511 }
512 (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
513 (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
514 let decoders = encodings
515 .iter()
516 .map(Self::try_new_internal)
517 .collect::<Result<Vec<_>, _>>()?;
518 if fields.len() != decoders.len() {
519 return Err(AvroError::SchemaError(format!(
520 "Union has {} fields but {} decoders",
521 fields.len(),
522 decoders.len()
523 )));
524 }
525 let branch_count = decoders.len();
528 let max_addr = (i32::MAX as usize) + 1;
529 if branch_count > max_addr {
530 return Err(AvroError::SchemaError(format!(
531 "Union has {branch_count} branches, which exceeds the maximum addressable \
532 branches by an Avro int tag ({} + 1).",
533 i32::MAX
534 )));
535 }
536 let mut builder = UnionDecoderBuilder::new()
537 .with_fields(fields.clone())
538 .with_branches(decoders);
539 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
540 if info.reader_is_union {
541 builder = builder.with_resolved_union(info.clone());
542 }
543 }
544 Self::Union(builder.build()?)
545 }
546 (Codec::Union(_, _, _), _) => {
547 return Err(AvroError::NYI(
548 "Sparse Arrow unions are not yet supported".to_string(),
549 ));
550 }
551 #[cfg(feature = "avro_custom_types")]
552 (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
553 let inner = Self::try_new(values_dt)?;
554 let byte_width: u8 = match *width_bits_or_bytes {
555 2 | 4 | 8 => *width_bits_or_bytes,
556 16 => 2,
557 32 => 4,
558 64 => 8,
559 other => {
560 return Err(AvroError::InvalidArgument(format!(
561 "Unsupported run-end width {other} for RunEndEncoded; \
562 expected 16/32/64 bits or 2/4/8 bytes"
563 )));
564 }
565 };
566 Self::RunEndEncoded(byte_width, 0, Box::new(inner))
567 }
568 };
569 Ok(match data_type.nullability() {
570 Some(nullability) => {
571 let mut plan = NullablePlan::ReadTag;
573 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
574 if !info.writer_is_union && info.reader_is_union {
575 if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
576 plan = NullablePlan::FromSingle { promotion: *promo };
577 }
578 }
579 }
580 Self::Nullable(
581 nullability,
582 NullBufferBuilder::new(DEFAULT_CAPACITY),
583 Box::new(decoder),
584 plan,
585 )
586 }
587 None => decoder,
588 })
589 }
590
591 fn append_null(&mut self) -> Result<(), AvroError> {
593 match self {
594 Self::Null(count) => *count += 1,
595 Self::Boolean(b) => b.append(false),
596 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
597 Self::Int64(v)
598 | Self::Int32ToInt64(v)
599 | Self::TimeMicros(v)
600 | Self::TimestampMillis(_, v)
601 | Self::TimestampMicros(_, v)
602 | Self::TimestampNanos(_, v) => v.push(0),
603 #[cfg(feature = "avro_custom_types")]
604 Self::DurationSecond(v)
605 | Self::DurationMillisecond(v)
606 | Self::DurationMicrosecond(v)
607 | Self::DurationNanosecond(v) => v.push(0),
608 #[cfg(feature = "avro_custom_types")]
609 Self::Int8(v) => v.push(0),
610 #[cfg(feature = "avro_custom_types")]
611 Self::Int16(v) => v.push(0),
612 #[cfg(feature = "avro_custom_types")]
613 Self::UInt8(v) => v.push(0),
614 #[cfg(feature = "avro_custom_types")]
615 Self::UInt16(v) => v.push(0),
616 #[cfg(feature = "avro_custom_types")]
617 Self::UInt32(v) => v.push(0),
618 #[cfg(feature = "avro_custom_types")]
619 Self::UInt64(v) => v.push(0),
620 #[cfg(feature = "avro_custom_types")]
621 Self::Float16(v) => v.push(0),
622 #[cfg(feature = "avro_custom_types")]
623 Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => v.push(0),
624 #[cfg(feature = "avro_custom_types")]
625 Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
626 #[cfg(feature = "avro_custom_types")]
627 Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
628 #[cfg(feature = "avro_custom_types")]
629 Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
630 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
631 Self::Float64(v)
632 | Self::Int32ToFloat64(v)
633 | Self::Int64ToFloat64(v)
634 | Self::Float32ToFloat64(v) => v.push(0.),
635 Self::Binary(offsets, _)
636 | Self::String(offsets, _)
637 | Self::StringView(offsets, _)
638 | Self::BytesToString(offsets, _)
639 | Self::StringToBytes(offsets, _) => {
640 offsets.push_length(0);
641 }
642 Self::Uuid(v) => {
643 v.extend([0; 16]);
644 }
645 Self::Array(_, offsets, _) => {
646 offsets.push_length(0);
647 }
648 Self::Record(_, e, _) => {
649 for encoding in e.iter_mut() {
650 encoding.append_null()?;
651 }
652 }
653 Self::Map(_, _koff, moff, _, _) => {
654 moff.push_length(0);
655 }
656 Self::Fixed(sz, accum) => {
657 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
658 }
659 #[cfg(feature = "small_decimals")]
660 Self::Decimal32(_, _, _, builder) => builder.append_value(0),
661 #[cfg(feature = "small_decimals")]
662 Self::Decimal64(_, _, _, builder) => builder.append_value(0),
663 Self::Decimal128(_, _, _, builder) => builder.append_value(0),
664 Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
665 Self::Enum(indices, _, _) => indices.push(0),
666 Self::Duration(builder) => builder.append_null(),
667 #[cfg(feature = "avro_custom_types")]
668 Self::RunEndEncoded(_, len, inner) => {
669 *len += 1;
670 inner.append_null()?;
671 }
672 Self::Union(u) => u.append_null()?,
673 Self::Nullable(_, null_buffer, inner, _) => {
674 null_buffer.append(false);
675 inner.append_null()?;
676 }
677 }
678 Ok(())
679 }
680
681 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
683 match self {
684 Self::Nullable(_, nb, inner, _) => {
685 if matches!(lit, AvroLiteral::Null) {
686 nb.append(false);
687 inner.append_null()
688 } else {
689 nb.append(true);
690 inner.append_default(lit)
691 }
692 }
693 Self::Null(count) => match lit {
694 AvroLiteral::Null => {
695 *count += 1;
696 Ok(())
697 }
698 _ => Err(AvroError::InvalidArgument(
699 "Non-null default for null type".to_string(),
700 )),
701 },
702 Self::Boolean(b) => match lit {
703 AvroLiteral::Boolean(v) => {
704 b.append(*v);
705 Ok(())
706 }
707 _ => Err(AvroError::InvalidArgument(
708 "Default for boolean must be boolean".to_string(),
709 )),
710 },
711 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
712 AvroLiteral::Int(i) => {
713 v.push(*i);
714 Ok(())
715 }
716 _ => Err(AvroError::InvalidArgument(
717 "Default for int32/date32/time-millis must be int".to_string(),
718 )),
719 },
720 #[cfg(feature = "avro_custom_types")]
721 Self::DurationSecond(v)
722 | Self::DurationMillisecond(v)
723 | Self::DurationMicrosecond(v)
724 | Self::DurationNanosecond(v) => match lit {
725 AvroLiteral::Long(i) => {
726 v.push(*i);
727 Ok(())
728 }
729 _ => Err(AvroError::InvalidArgument(
730 "Default for duration long must be long".to_string(),
731 )),
732 },
733 #[cfg(feature = "avro_custom_types")]
734 Self::Int8(v) => match lit {
735 AvroLiteral::Int(i) => {
736 let x = i8::try_from(*i).map_err(|_| {
737 AvroError::InvalidArgument(format!(
738 "Default for int8 out of range for i8: {i}"
739 ))
740 })?;
741 v.push(x);
742 Ok(())
743 }
744 _ => Err(AvroError::InvalidArgument(
745 "Default for int8 must be int".to_string(),
746 )),
747 },
748 #[cfg(feature = "avro_custom_types")]
749 Self::Int16(v) => match lit {
750 AvroLiteral::Int(i) => {
751 let x = i16::try_from(*i).map_err(|_| {
752 AvroError::InvalidArgument(format!(
753 "Default for int16 out of range for i16: {i}"
754 ))
755 })?;
756 v.push(x);
757 Ok(())
758 }
759 _ => Err(AvroError::InvalidArgument(
760 "Default for int16 must be int".to_string(),
761 )),
762 },
763 #[cfg(feature = "avro_custom_types")]
764 Self::UInt8(v) => match lit {
765 AvroLiteral::Int(i) => {
766 let x = u8::try_from(*i).map_err(|_| {
767 AvroError::InvalidArgument(format!(
768 "Default for uint8 out of range for u8: {i}"
769 ))
770 })?;
771 v.push(x);
772 Ok(())
773 }
774 _ => Err(AvroError::InvalidArgument(
775 "Default for uint8 must be int".to_string(),
776 )),
777 },
778 #[cfg(feature = "avro_custom_types")]
779 Self::UInt16(v) => match lit {
780 AvroLiteral::Int(i) => {
781 let x = u16::try_from(*i).map_err(|_| {
782 AvroError::InvalidArgument(format!(
783 "Default for uint16 out of range for u16: {i}"
784 ))
785 })?;
786 v.push(x);
787 Ok(())
788 }
789 _ => Err(AvroError::InvalidArgument(
790 "Default for uint16 must be int".to_string(),
791 )),
792 },
793 #[cfg(feature = "avro_custom_types")]
794 Self::UInt32(v) => match lit {
795 AvroLiteral::Long(i) => {
796 let x = u32::try_from(*i).map_err(|_| {
797 AvroError::InvalidArgument(format!(
798 "Default for uint32 out of range for u32: {i}"
799 ))
800 })?;
801 v.push(x);
802 Ok(())
803 }
804 _ => Err(AvroError::InvalidArgument(
805 "Default for uint32 must be long".to_string(),
806 )),
807 },
808 #[cfg(feature = "avro_custom_types")]
809 Self::UInt64(v) => match lit {
810 AvroLiteral::Bytes(b) => {
811 if b.len() != 8 {
812 return Err(AvroError::InvalidArgument(format!(
813 "uint64 default must be exactly 8 bytes, got {}",
814 b.len()
815 )));
816 }
817 v.push(u64::from_le_bytes([
818 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
819 ]));
820 Ok(())
821 }
822 _ => Err(AvroError::InvalidArgument(
823 "Default for uint64 must be bytes (8-byte LE)".to_string(),
824 )),
825 },
826 #[cfg(feature = "avro_custom_types")]
827 Self::Float16(v) => match lit {
828 AvroLiteral::Bytes(b) => {
829 if b.len() != 2 {
830 return Err(AvroError::InvalidArgument(format!(
831 "float16 default must be exactly 2 bytes, got {}",
832 b.len()
833 )));
834 }
835 v.push(u16::from_le_bytes([b[0], b[1]]));
836 Ok(())
837 }
838 _ => Err(AvroError::InvalidArgument(
839 "Default for float16 must be bytes (2-byte LE IEEE-754)".to_string(),
840 )),
841 },
842 #[cfg(feature = "avro_custom_types")]
843 Self::Date64(v) | Self::TimeNanos(v) | Self::TimestampSecs(_, v) => match lit {
844 AvroLiteral::Long(i) => {
845 v.push(*i);
846 Ok(())
847 }
848 _ => Err(AvroError::InvalidArgument(
849 "Default for date64/time-nanos/timestamp-secs must be long".to_string(),
850 )),
851 },
852 #[cfg(feature = "avro_custom_types")]
853 Self::Time32Secs(v) => match lit {
854 AvroLiteral::Int(i) => {
855 v.push(*i);
856 Ok(())
857 }
858 _ => Err(AvroError::InvalidArgument(
859 "Default for time32-secs must be int".to_string(),
860 )),
861 },
862 #[cfg(feature = "avro_custom_types")]
863 Self::IntervalYearMonth(v) => match lit {
864 AvroLiteral::Bytes(b) => {
865 if b.len() != 4 {
866 return Err(AvroError::InvalidArgument(format!(
867 "interval-year-month default must be exactly 4 bytes, got {}",
868 b.len()
869 )));
870 }
871 v.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
872 Ok(())
873 }
874 _ => Err(AvroError::InvalidArgument(
875 "Default for interval-year-month must be bytes (4-byte LE)".to_string(),
876 )),
877 },
878 #[cfg(feature = "avro_custom_types")]
879 Self::IntervalMonthDayNano(v) => match lit {
880 AvroLiteral::Bytes(b) => {
881 if b.len() != 16 {
882 return Err(AvroError::InvalidArgument(format!(
883 "interval-month-day-nano default must be exactly 16 bytes, got {}",
884 b.len()
885 )));
886 }
887 let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
888 let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
889 let nanos =
890 i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
891 v.push(IntervalMonthDayNano::new(months, days, nanos));
892 Ok(())
893 }
894 _ => Err(AvroError::InvalidArgument(
895 "Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
896 )),
897 },
898 #[cfg(feature = "avro_custom_types")]
899 Self::IntervalDayTime(v) => match lit {
900 AvroLiteral::Bytes(b) => {
901 if b.len() != 8 {
902 return Err(AvroError::InvalidArgument(format!(
903 "interval-day-time default must be exactly 8 bytes, got {}",
904 b.len()
905 )));
906 }
907 let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
908 let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
909 v.push(IntervalDayTime::new(days, milliseconds));
910 Ok(())
911 }
912 _ => Err(AvroError::InvalidArgument(
913 "Default for interval-day-time must be bytes (8-byte LE)".to_string(),
914 )),
915 },
916 Self::Int64(v)
917 | Self::Int32ToInt64(v)
918 | Self::TimeMicros(v)
919 | Self::TimestampMillis(_, v)
920 | Self::TimestampMicros(_, v)
921 | Self::TimestampNanos(_, v) => match lit {
922 AvroLiteral::Long(i) => {
923 v.push(*i);
924 Ok(())
925 }
926 AvroLiteral::Int(i) => {
927 v.push(*i as i64);
928 Ok(())
929 }
930 _ => Err(AvroError::InvalidArgument(
931 "Default for long/time-micros/timestamp must be long or int".to_string(),
932 )),
933 },
934 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
935 AvroLiteral::Float(f) => {
936 v.push(*f);
937 Ok(())
938 }
939 _ => Err(AvroError::InvalidArgument(
940 "Default for float must be float".to_string(),
941 )),
942 },
943 Self::Float64(v)
944 | Self::Int32ToFloat64(v)
945 | Self::Int64ToFloat64(v)
946 | Self::Float32ToFloat64(v) => match lit {
947 AvroLiteral::Double(f) => {
948 v.push(*f);
949 Ok(())
950 }
951 _ => Err(AvroError::InvalidArgument(
952 "Default for double must be double".to_string(),
953 )),
954 },
955 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
956 AvroLiteral::Bytes(b) => {
957 offsets.push_length(b.len());
958 values.extend_from_slice(b);
959 Ok(())
960 }
961 _ => Err(AvroError::InvalidArgument(
962 "Default for bytes must be bytes".to_string(),
963 )),
964 },
965 Self::BytesToString(offsets, values)
966 | Self::String(offsets, values)
967 | Self::StringView(offsets, values) => match lit {
968 AvroLiteral::String(s) => {
969 let b = s.as_bytes();
970 offsets.push_length(b.len());
971 values.extend_from_slice(b);
972 Ok(())
973 }
974 _ => Err(AvroError::InvalidArgument(
975 "Default for string must be string".to_string(),
976 )),
977 },
978 Self::Uuid(values) => match lit {
979 AvroLiteral::String(s) => {
980 let uuid = Uuid::try_parse(s).map_err(|e| {
981 AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
982 })?;
983 values.extend_from_slice(uuid.as_bytes());
984 Ok(())
985 }
986 _ => Err(AvroError::InvalidArgument(
987 "Default for uuid must be string".to_string(),
988 )),
989 },
990 Self::Fixed(sz, accum) => match lit {
991 AvroLiteral::Bytes(b) => {
992 if b.len() != *sz as usize {
993 return Err(AvroError::InvalidArgument(format!(
994 "Fixed default length {} does not match size {sz}",
995 b.len(),
996 )));
997 }
998 accum.extend_from_slice(b);
999 Ok(())
1000 }
1001 _ => Err(AvroError::InvalidArgument(
1002 "Default for fixed must be bytes".to_string(),
1003 )),
1004 },
1005 #[cfg(feature = "small_decimals")]
1006 Self::Decimal32(_, _, _, builder) => {
1007 append_decimal_default!(lit, builder, 4, i32, "decimal32")
1008 }
1009 #[cfg(feature = "small_decimals")]
1010 Self::Decimal64(_, _, _, builder) => {
1011 append_decimal_default!(lit, builder, 8, i64, "decimal64")
1012 }
1013 Self::Decimal128(_, _, _, builder) => {
1014 append_decimal_default!(lit, builder, 16, i128, "decimal128")
1015 }
1016 Self::Decimal256(_, _, _, builder) => {
1017 append_decimal_default!(lit, builder, 32, i256, "decimal256")
1018 }
1019 Self::Duration(builder) => match lit {
1020 AvroLiteral::Bytes(b) => {
1021 if b.len() != 12 {
1022 return Err(AvroError::InvalidArgument(format!(
1023 "Duration default must be exactly 12 bytes, got {}",
1024 b.len()
1025 )));
1026 }
1027 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1028 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1029 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1030 let nanos = (millis as i64) * 1_000_000;
1031 builder.append_value(IntervalMonthDayNano::new(
1032 months as i32,
1033 days as i32,
1034 nanos,
1035 ));
1036 Ok(())
1037 }
1038 _ => Err(AvroError::InvalidArgument(
1039 "Default for duration must be 12-byte little-endian months/days/millis"
1040 .to_string(),
1041 )),
1042 },
1043 Self::Array(_, offsets, inner) => match lit {
1044 AvroLiteral::Array(items) => {
1045 offsets.push_length(items.len());
1046 for item in items {
1047 inner.append_default(item)?;
1048 }
1049 Ok(())
1050 }
1051 _ => Err(AvroError::InvalidArgument(
1052 "Default for array must be an array literal".to_string(),
1053 )),
1054 },
1055 Self::Map(_, koff, moff, kdata, valdec) => match lit {
1056 AvroLiteral::Map(entries) => {
1057 moff.push_length(entries.len());
1058 for (k, v) in entries {
1059 let kb = k.as_bytes();
1060 koff.push_length(kb.len());
1061 kdata.extend_from_slice(kb);
1062 valdec.append_default(v)?;
1063 }
1064 Ok(())
1065 }
1066 _ => Err(AvroError::InvalidArgument(
1067 "Default for map must be a map/object literal".to_string(),
1068 )),
1069 },
1070 Self::Enum(indices, symbols, _) => match lit {
1071 AvroLiteral::Enum(sym) => {
1072 let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
1073 AvroError::InvalidArgument(format!(
1074 "Enum default symbol {sym:?} not in reader symbols"
1075 ))
1076 })?;
1077 indices.push(pos as i32);
1078 Ok(())
1079 }
1080 _ => Err(AvroError::InvalidArgument(
1081 "Default for enum must be a symbol".to_string(),
1082 )),
1083 },
1084 #[cfg(feature = "avro_custom_types")]
1085 Self::RunEndEncoded(_, len, inner) => {
1086 *len += 1;
1087 inner.append_default(lit)
1088 }
1089 Self::Union(u) => u.append_default(lit),
1090 Self::Record(field_meta, decoders, projector) => match lit {
1091 AvroLiteral::Map(entries) => {
1092 for (i, dec) in decoders.iter_mut().enumerate() {
1093 let name = field_meta[i].name();
1094 if let Some(sub) = entries.get(name) {
1095 dec.append_default(sub)?;
1096 } else if let Some(proj) = projector.as_ref() {
1097 proj.project_default(dec, i)?;
1098 } else {
1099 dec.append_null()?;
1100 }
1101 }
1102 Ok(())
1103 }
1104 AvroLiteral::Null => {
1105 for (i, dec) in decoders.iter_mut().enumerate() {
1106 if let Some(proj) = projector.as_ref() {
1107 proj.project_default(dec, i)?;
1108 } else {
1109 dec.append_null()?;
1110 }
1111 }
1112 Ok(())
1113 }
1114 _ => Err(AvroError::InvalidArgument(
1115 "Default for record must be a map/object or null".to_string(),
1116 )),
1117 },
1118 }
1119 }
1120
1121 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1123 match self {
1124 Self::Null(x) => *x += 1,
1125 Self::Boolean(values) => values.append(buf.get_bool()?),
1126 Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
1127 values.push(buf.get_int()?)
1128 }
1129 Self::Int64(values)
1130 | Self::TimeMicros(values)
1131 | Self::TimestampMillis(_, values)
1132 | Self::TimestampMicros(_, values)
1133 | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
1134 #[cfg(feature = "avro_custom_types")]
1135 Self::DurationSecond(values)
1136 | Self::DurationMillisecond(values)
1137 | Self::DurationMicrosecond(values)
1138 | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
1139 #[cfg(feature = "avro_custom_types")]
1140 Self::Int8(values) => {
1141 let raw = buf.get_int()?;
1142 let x = i8::try_from(raw).map_err(|_| {
1143 AvroError::ParseError(format!("int8 value {raw} out of range for i8"))
1144 })?;
1145 values.push(x);
1146 }
1147 #[cfg(feature = "avro_custom_types")]
1148 Self::Int16(values) => {
1149 let raw = buf.get_int()?;
1150 let x = i16::try_from(raw).map_err(|_| {
1151 AvroError::ParseError(format!("int16 value {raw} out of range for i16"))
1152 })?;
1153 values.push(x);
1154 }
1155 #[cfg(feature = "avro_custom_types")]
1156 Self::UInt8(values) => {
1157 let raw = buf.get_int()?;
1158 let x = u8::try_from(raw).map_err(|_| {
1159 AvroError::ParseError(format!("uint8 value {raw} out of range for u8"))
1160 })?;
1161 values.push(x);
1162 }
1163 #[cfg(feature = "avro_custom_types")]
1164 Self::UInt16(values) => {
1165 let raw = buf.get_int()?;
1166 let x = u16::try_from(raw).map_err(|_| {
1167 AvroError::ParseError(format!("uint16 value {raw} out of range for u16"))
1168 })?;
1169 values.push(x);
1170 }
1171 #[cfg(feature = "avro_custom_types")]
1172 Self::UInt32(values) => {
1173 let raw = buf.get_long()?;
1174 let x = u32::try_from(raw).map_err(|_| {
1175 AvroError::ParseError(format!("uint32 value {raw} out of range for u32"))
1176 })?;
1177 values.push(x);
1178 }
1179 #[cfg(feature = "avro_custom_types")]
1180 Self::UInt64(values) => {
1181 let b = buf.get_fixed(8)?;
1182 values.push(u64::from_le_bytes([
1183 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
1184 ]));
1185 }
1186 #[cfg(feature = "avro_custom_types")]
1187 Self::Float16(values) => {
1188 let b = buf.get_fixed(2)?;
1189 values.push(u16::from_le_bytes([b[0], b[1]]));
1190 }
1191 #[cfg(feature = "avro_custom_types")]
1192 Self::Date64(values) | Self::TimeNanos(values) | Self::TimestampSecs(_, values) => {
1193 values.push(buf.get_long()?)
1194 }
1195 #[cfg(feature = "avro_custom_types")]
1196 Self::Time32Secs(values) => values.push(buf.get_int()?),
1197 #[cfg(feature = "avro_custom_types")]
1198 Self::IntervalYearMonth(values) => {
1199 let b = buf.get_fixed(4)?;
1200 values.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
1201 }
1202 #[cfg(feature = "avro_custom_types")]
1203 Self::IntervalMonthDayNano(values) => {
1204 let b = buf.get_fixed(16)?;
1205 let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1206 let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1207 let nanos =
1208 i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
1209 values.push(IntervalMonthDayNano::new(months, days, nanos));
1210 }
1211 #[cfg(feature = "avro_custom_types")]
1212 Self::IntervalDayTime(values) => {
1213 let b = buf.get_fixed(8)?;
1214 let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1216 let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1217 values.push(IntervalDayTime::new(days, milliseconds));
1218 }
1219 Self::Float32(values) => values.push(buf.get_float()?),
1220 Self::Float64(values) => values.push(buf.get_double()?),
1221 Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
1222 Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
1223 Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
1224 Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
1225 Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
1226 Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
1227 Self::StringToBytes(offsets, values)
1228 | Self::BytesToString(offsets, values)
1229 | Self::Binary(offsets, values)
1230 | Self::String(offsets, values)
1231 | Self::StringView(offsets, values) => {
1232 let data = buf.get_bytes()?;
1233 offsets.push_length(data.len());
1234 values.extend_from_slice(data);
1235 }
1236 Self::Uuid(values) => {
1237 let s_bytes = buf.get_bytes()?;
1238 let s = std::str::from_utf8(s_bytes).map_err(|e| {
1239 AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
1240 })?;
1241 let uuid = Uuid::try_parse(s)
1242 .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
1243 values.extend_from_slice(uuid.as_bytes());
1244 }
1245 Self::Array(_, off, encoding) => {
1246 let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
1247 off.push_length(total_items);
1248 }
1249 Self::Record(_, encodings, None) => {
1250 for encoding in encodings {
1251 encoding.decode(buf)?;
1252 }
1253 }
1254 Self::Record(_, encodings, Some(proj)) => {
1255 proj.project_record(buf, encodings)?;
1256 }
1257 Self::Map(_, koff, moff, kdata, valdec) => {
1258 let newly_added = read_blocks(buf, |cur| {
1259 let kb = cur.get_bytes()?;
1260 koff.push_length(kb.len());
1261 kdata.extend_from_slice(kb);
1262 valdec.decode(cur)
1263 })?;
1264 moff.push_length(newly_added);
1265 }
1266 Self::Fixed(sz, accum) => {
1267 let fx = buf.get_fixed(*sz as usize)?;
1268 accum.extend_from_slice(fx);
1269 }
1270 #[cfg(feature = "small_decimals")]
1271 Self::Decimal32(_, _, size, builder) => {
1272 decode_decimal!(size, buf, builder, 4, i32);
1273 }
1274 #[cfg(feature = "small_decimals")]
1275 Self::Decimal64(_, _, size, builder) => {
1276 decode_decimal!(size, buf, builder, 8, i64);
1277 }
1278 Self::Decimal128(_, _, size, builder) => {
1279 decode_decimal!(size, buf, builder, 16, i128);
1280 }
1281 Self::Decimal256(_, _, size, builder) => {
1282 decode_decimal!(size, buf, builder, 32, i256);
1283 }
1284 Self::Enum(indices, _, None) => {
1285 indices.push(buf.get_int()?);
1286 }
1287 Self::Enum(indices, _, Some(res)) => {
1288 let raw = buf.get_int()?;
1289 let resolved = usize::try_from(raw)
1290 .ok()
1291 .and_then(|idx| res.mapping.get(idx).copied())
1292 .filter(|&idx| idx >= 0)
1293 .unwrap_or(res.default_index);
1294 if resolved >= 0 {
1295 indices.push(resolved);
1296 } else {
1297 return Err(AvroError::ParseError(format!(
1298 "Enum symbol index {raw} not resolvable and no default provided",
1299 )));
1300 }
1301 }
1302 Self::Duration(builder) => {
1303 let b = buf.get_fixed(12)?;
1304 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1305 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1306 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
1307 let nanos = (millis as i64) * 1_000_000;
1308 builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
1309 }
1310 #[cfg(feature = "avro_custom_types")]
1311 Self::RunEndEncoded(_, len, inner) => {
1312 *len += 1;
1313 inner.decode(buf)?;
1314 }
1315 Self::Union(u) => u.decode(buf)?,
1316 Self::Nullable(order, nb, encoding, plan) => match *plan {
1317 NullablePlan::FromSingle { promotion } => {
1318 encoding.decode_with_promotion(buf, promotion)?;
1319 nb.append(true);
1320 }
1321 NullablePlan::ReadTag => {
1322 let branch = buf.read_vlq()?;
1323 let is_not_null = match *order {
1324 Nullability::NullFirst => branch != 0,
1325 Nullability::NullSecond => branch == 0,
1326 };
1327 if is_not_null {
1328 encoding.decode(buf)?;
1330 } else {
1331 encoding.append_null()?;
1332 }
1333 nb.append(is_not_null);
1334 }
1335 },
1336 }
1337 Ok(())
1338 }
1339
1340 fn decode_with_promotion(
1341 &mut self,
1342 buf: &mut AvroCursor<'_>,
1343 promotion: Promotion,
1344 ) -> Result<(), AvroError> {
1345 #[cfg(feature = "avro_custom_types")]
1346 if let Self::RunEndEncoded(_, len, inner) = self {
1347 *len += 1;
1348 return inner.decode_with_promotion(buf, promotion);
1349 }
1350
1351 macro_rules! promote_numeric_to {
1352 ($variant:ident, $getter:ident, $to:ty) => {{
1353 match self {
1354 Self::$variant(v) => {
1355 let x = buf.$getter()?;
1356 v.push(x as $to);
1357 Ok(())
1358 }
1359 other => Err(AvroError::ParseError(format!(
1360 "Promotion {promotion} target mismatch: expected {}, got {}",
1361 stringify!($variant),
1362 <Self as ::std::convert::AsRef<str>>::as_ref(other)
1363 ))),
1364 }
1365 }};
1366 }
1367 match promotion {
1368 Promotion::Direct => self.decode(buf),
1369 Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1370 Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1371 Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1372 Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1373 Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1374 Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1375 Promotion::StringToBytes => match self {
1376 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1377 let data = buf.get_bytes()?;
1378 offsets.push_length(data.len());
1379 values.extend_from_slice(data);
1380 Ok(())
1381 }
1382 other => Err(AvroError::ParseError(format!(
1383 "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1384 <Self as AsRef<str>>::as_ref(other)
1385 ))),
1386 },
1387 Promotion::BytesToString => match self {
1388 Self::String(offsets, values)
1389 | Self::StringView(offsets, values)
1390 | Self::BytesToString(offsets, values) => {
1391 let data = buf.get_bytes()?;
1392 offsets.push_length(data.len());
1393 values.extend_from_slice(data);
1394 Ok(())
1395 }
1396 other => Err(AvroError::ParseError(format!(
1397 "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1398 <Self as AsRef<str>>::as_ref(other)
1399 ))),
1400 },
1401 }
1402 }
1403
1404 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1406 Ok(match self {
1407 Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1408 Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1409 Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1410 Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1411 Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1412 Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1413 Self::TimeMillis(values) => {
1414 Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1415 }
1416 Self::TimeMicros(values) => {
1417 Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1418 }
1419 Self::TimestampMillis(is_utc, values) => Arc::new(
1420 flush_primitive::<TimestampMillisecondType>(values, nulls)
1421 .with_timezone_opt(is_utc.then(|| "+00:00")),
1422 ),
1423 Self::TimestampMicros(is_utc, values) => Arc::new(
1424 flush_primitive::<TimestampMicrosecondType>(values, nulls)
1425 .with_timezone_opt(is_utc.then(|| "+00:00")),
1426 ),
1427 Self::TimestampNanos(is_utc, values) => Arc::new(
1428 flush_primitive::<TimestampNanosecondType>(values, nulls)
1429 .with_timezone_opt(is_utc.then(|| "+00:00")),
1430 ),
1431 #[cfg(feature = "avro_custom_types")]
1432 Self::DurationSecond(values) => {
1433 Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1434 }
1435 #[cfg(feature = "avro_custom_types")]
1436 Self::DurationMillisecond(values) => {
1437 Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1438 }
1439 #[cfg(feature = "avro_custom_types")]
1440 Self::DurationMicrosecond(values) => {
1441 Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1442 }
1443 #[cfg(feature = "avro_custom_types")]
1444 Self::DurationNanosecond(values) => {
1445 Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1446 }
1447 #[cfg(feature = "avro_custom_types")]
1448 Self::Int8(values) => Arc::new(flush_primitive::<Int8Type>(values, nulls)),
1449 #[cfg(feature = "avro_custom_types")]
1450 Self::Int16(values) => Arc::new(flush_primitive::<Int16Type>(values, nulls)),
1451 #[cfg(feature = "avro_custom_types")]
1452 Self::UInt8(values) => Arc::new(flush_primitive::<UInt8Type>(values, nulls)),
1453 #[cfg(feature = "avro_custom_types")]
1454 Self::UInt16(values) => Arc::new(flush_primitive::<UInt16Type>(values, nulls)),
1455 #[cfg(feature = "avro_custom_types")]
1456 Self::UInt32(values) => Arc::new(flush_primitive::<UInt32Type>(values, nulls)),
1457 #[cfg(feature = "avro_custom_types")]
1458 Self::UInt64(values) => Arc::new(flush_primitive::<UInt64Type>(values, nulls)),
1459 #[cfg(feature = "avro_custom_types")]
1460 Self::Float16(values) => {
1461 let len = values.len();
1464 let buf: Buffer = std::mem::take(values).into();
1465 let scalar_buf = ScalarBuffer::new(buf, 0, len);
1466 Arc::new(Float16Array::new(scalar_buf, nulls))
1467 }
1468 #[cfg(feature = "avro_custom_types")]
1469 Self::Date64(values) => Arc::new(flush_primitive::<Date64Type>(values, nulls)),
1470 #[cfg(feature = "avro_custom_types")]
1471 Self::TimeNanos(values) => {
1472 Arc::new(flush_primitive::<Time64NanosecondType>(values, nulls))
1473 }
1474 #[cfg(feature = "avro_custom_types")]
1475 Self::Time32Secs(values) => {
1476 Arc::new(flush_primitive::<Time32SecondType>(values, nulls))
1477 }
1478 #[cfg(feature = "avro_custom_types")]
1479 Self::TimestampSecs(is_utc, values) => Arc::new(
1480 flush_primitive::<TimestampSecondType>(values, nulls)
1481 .with_timezone_opt(is_utc.then(|| "+00:00")),
1482 ),
1483 #[cfg(feature = "avro_custom_types")]
1484 Self::IntervalYearMonth(values) => {
1485 Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
1486 }
1487 #[cfg(feature = "avro_custom_types")]
1488 Self::IntervalMonthDayNano(values) => {
1489 Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1490 }
1491 #[cfg(feature = "avro_custom_types")]
1492 Self::IntervalDayTime(values) => {
1493 Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
1494 }
1495 Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1496 Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1497 Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1498 Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1499 Arc::new(flush_primitive::<Float32Type>(values, nulls))
1500 }
1501 Self::Int32ToFloat64(values)
1502 | Self::Int64ToFloat64(values)
1503 | Self::Float32ToFloat64(values) => {
1504 Arc::new(flush_primitive::<Float64Type>(values, nulls))
1505 }
1506 Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1507 let offsets = flush_offsets(offsets);
1508 let values = flush_values(values).into();
1509 Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1510 }
1511 Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1512 let offsets = flush_offsets(offsets);
1513 let values = flush_values(values).into();
1514 Arc::new(StringArray::try_new(offsets, values, nulls)?)
1515 }
1516 Self::StringView(offsets, values) => {
1517 let offsets = flush_offsets(offsets);
1518 let values = flush_values(values);
1519 let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1520 let values: Vec<&str> = (0..array.len())
1521 .map(|i| {
1522 if array.is_valid(i) {
1523 array.value(i)
1524 } else {
1525 ""
1526 }
1527 })
1528 .collect();
1529 Arc::new(StringViewArray::from(values))
1530 }
1531 Self::Array(field, offsets, values) => {
1532 let values = values.flush(None)?;
1533 let offsets = flush_offsets(offsets);
1534 Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1535 }
1536 Self::Record(fields, encodings, _) => {
1537 let arrays = encodings
1538 .iter_mut()
1539 .map(|x| x.flush(None))
1540 .collect::<Result<Vec<_>, _>>()?;
1541 Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1542 }
1543 Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1544 let moff = flush_offsets(m_off);
1545 let koff = flush_offsets(k_off);
1546 let kd = flush_values(kdata).into();
1547 let val_arr = valdec.flush(None)?;
1548 let key_arr = StringArray::try_new(koff, kd, None)?;
1549 if key_arr.len() != val_arr.len() {
1550 return Err(AvroError::InvalidArgument(format!(
1551 "Map keys length ({}) != map values length ({})",
1552 key_arr.len(),
1553 val_arr.len()
1554 )));
1555 }
1556 let final_len = moff.len() - 1;
1557 if let Some(n) = &nulls {
1558 if n.len() != final_len {
1559 return Err(AvroError::InvalidArgument(format!(
1560 "Map array null buffer length {} != final map length {final_len}",
1561 n.len()
1562 )));
1563 }
1564 }
1565 let entries_fields = match map_field.data_type() {
1566 DataType::Struct(fields) => fields.clone(),
1567 other => {
1568 return Err(AvroError::InvalidArgument(format!(
1569 "Map entries field must be a Struct, got {other:?}"
1570 )));
1571 }
1572 };
1573 let entries_struct =
1574 StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1575 let map_arr =
1576 MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1577 Arc::new(map_arr)
1578 }
1579 Self::Fixed(sz, accum) => {
1580 let b: Buffer = flush_values(accum).into();
1581 let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1582 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1583 Arc::new(arr)
1584 }
1585 Self::Uuid(values) => {
1586 let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1587 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1588 Arc::new(arr)
1589 }
1590 #[cfg(feature = "small_decimals")]
1591 Self::Decimal32(precision, scale, _, builder) => {
1592 flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1593 }
1594 #[cfg(feature = "small_decimals")]
1595 Self::Decimal64(precision, scale, _, builder) => {
1596 flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1597 }
1598 Self::Decimal128(precision, scale, _, builder) => {
1599 flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1600 }
1601 Self::Decimal256(precision, scale, _, builder) => {
1602 flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1603 }
1604 Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1605 Self::Duration(builder) => {
1606 let (_, vals, _) = builder.finish().into_parts();
1607 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1608 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1609 Arc::new(vals)
1610 }
1611 #[cfg(feature = "avro_custom_types")]
1612 Self::RunEndEncoded(width, len, inner) => {
1613 let values = inner.flush(nulls)?;
1614 let n = *len;
1615 let arr = values.as_ref();
1616 let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1617 if n > 0 {
1618 run_starts.push(0);
1619 for i in 1..n {
1620 if !values_equal_at(arr, i - 1, i) {
1621 run_starts.push(i);
1622 }
1623 }
1624 }
1625 if n > (u32::MAX as usize) {
1626 return Err(AvroError::InvalidArgument(format!(
1627 "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1628 )));
1629 }
1630 let run_count = run_starts.len();
1631 let take_idx: PrimitiveArray<UInt32Type> =
1632 run_starts.iter().map(|&s| s as u32).collect();
1633 let per_run_values = if run_count == 0 {
1634 values.slice(0, 0)
1635 } else {
1636 take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1637 AvroError::ParseError(format!("take() for REE values failed: {e}"))
1638 })?
1639 };
1640
1641 macro_rules! build_run_array {
1642 ($Native:ty, $ArrowTy:ty) => {{
1643 let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1644 for (idx, &_start) in run_starts.iter().enumerate() {
1645 let end = if idx + 1 < run_count {
1646 run_starts[idx + 1]
1647 } else {
1648 n
1649 };
1650 ends.push(end as $Native);
1651 }
1652 let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1653 let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1654 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1655 Arc::new(run_arr) as ArrayRef
1656 }};
1657 }
1658 match *width {
1659 2 => {
1660 if n > i16::MAX as usize {
1661 return Err(AvroError::InvalidArgument(format!(
1662 "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1663 )));
1664 }
1665 build_run_array!(i16, Int16Type)
1666 }
1667 4 => build_run_array!(i32, Int32Type),
1668 8 => build_run_array!(i64, Int64Type),
1669 other => {
1670 return Err(AvroError::InvalidArgument(format!(
1671 "Unsupported run-end width {other} for RunEndEncoded"
1672 )));
1673 }
1674 }
1675 }
1676 Self::Union(u) => u.flush(nulls)?,
1677 })
1678 }
1679}
1680
1681#[derive(Debug)]
1683struct DispatchLookupTable {
1684 to_reader: Box<[i8]>,
1700 promotion: Box<[Promotion]>,
1705}
1706
1707const NO_SOURCE: i8 = -1;
1710
1711impl DispatchLookupTable {
1712 fn from_writer_to_reader(
1713 promotion_map: &[Option<(usize, Promotion)>],
1714 ) -> Result<Self, AvroError> {
1715 let mut to_reader = Vec::with_capacity(promotion_map.len());
1716 let mut promotion = Vec::with_capacity(promotion_map.len());
1717 for map in promotion_map {
1718 match *map {
1719 Some((idx, promo)) => {
1720 let idx_i8 = i8::try_from(idx).map_err(|_| {
1721 AvroError::SchemaError(format!(
1722 "Reader branch index {idx} exceeds i8 range (max {})",
1723 i8::MAX
1724 ))
1725 })?;
1726 to_reader.push(idx_i8);
1727 promotion.push(promo);
1728 }
1729 None => {
1730 to_reader.push(NO_SOURCE);
1731 promotion.push(Promotion::Direct);
1732 }
1733 }
1734 }
1735 Ok(Self {
1736 to_reader: to_reader.into_boxed_slice(),
1737 promotion: promotion.into_boxed_slice(),
1738 })
1739 }
1740
1741 #[inline]
1743 fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1744 let reader_index = *self.to_reader.get(writer_index)?;
1745 (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1746 }
1747}
1748
1749#[derive(Debug)]
1750struct UnionDecoder {
1751 fields: UnionFields,
1752 type_ids: Vec<i8>,
1753 offsets: Vec<i32>,
1754 branches: Vec<Decoder>,
1755 counts: Vec<i32>,
1756 reader_type_codes: Vec<i8>,
1757 default_emit_idx: usize,
1758 null_emit_idx: usize,
1759 plan: UnionReadPlan,
1760}
1761
1762impl Default for UnionDecoder {
1763 fn default() -> Self {
1764 Self {
1765 fields: UnionFields::empty(),
1766 type_ids: Vec::new(),
1767 offsets: Vec::new(),
1768 branches: Vec::new(),
1769 counts: Vec::new(),
1770 reader_type_codes: Vec::new(),
1771 default_emit_idx: 0,
1772 null_emit_idx: 0,
1773 plan: UnionReadPlan::Passthrough,
1774 }
1775 }
1776}
1777
1778#[derive(Debug)]
1779enum UnionReadPlan {
1780 ReaderUnion {
1781 lookup_table: DispatchLookupTable,
1782 },
1783 FromSingle {
1784 reader_idx: usize,
1785 promotion: Promotion,
1786 },
1787 ToSingle {
1788 target: Box<Decoder>,
1789 lookup_table: DispatchLookupTable,
1790 },
1791 Passthrough,
1792}
1793
1794impl UnionDecoder {
1795 fn try_new(
1796 fields: UnionFields,
1797 branches: Vec<Decoder>,
1798 resolved: Option<ResolvedUnion>,
1799 ) -> Result<Self, AvroError> {
1800 let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1801 let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1802 let default_emit_idx = 0;
1803 let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1804 let branch_len = branches.len().max(reader_type_codes.len());
1805 let max_addr = (i32::MAX as usize) + 1;
1807 if branches.len() > max_addr {
1808 return Err(AvroError::SchemaError(format!(
1809 "Reader union has {} branches, which exceeds the maximum addressable \
1810 branches by an Avro int tag ({} + 1).",
1811 branches.len(),
1812 i32::MAX
1813 )));
1814 }
1815 Ok(Self {
1816 fields,
1817 type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1818 offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1819 branches,
1820 counts: vec![0; branch_len],
1821 reader_type_codes,
1822 default_emit_idx,
1823 null_emit_idx,
1824 plan: Self::plan_from_resolved(resolved)?,
1825 })
1826 }
1827
1828 fn try_new_from_writer_union(
1829 info: ResolvedUnion,
1830 target: Box<Decoder>,
1831 ) -> Result<Self, AvroError> {
1832 debug_assert!(info.writer_is_union && !info.reader_is_union);
1834 let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1835 Ok(Self {
1836 plan: UnionReadPlan::ToSingle {
1837 target,
1838 lookup_table,
1839 },
1840 ..Self::default()
1841 })
1842 }
1843
1844 fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, AvroError> {
1845 let Some(info) = resolved else {
1846 return Ok(UnionReadPlan::Passthrough);
1847 };
1848 match (info.writer_is_union, info.reader_is_union) {
1849 (true, true) => {
1850 let lookup_table =
1851 DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1852 Ok(UnionReadPlan::ReaderUnion { lookup_table })
1853 }
1854 (false, true) => {
1855 let Some(&(reader_idx, promotion)) =
1856 info.writer_to_reader.first().and_then(Option::as_ref)
1857 else {
1858 return Err(AvroError::SchemaError(
1859 "Writer type does not match any reader union branch".to_string(),
1860 ));
1861 };
1862 Ok(UnionReadPlan::FromSingle {
1863 reader_idx,
1864 promotion,
1865 })
1866 }
1867 (true, false) => Err(AvroError::InvalidArgument(
1868 "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1869 .to_string(),
1870 )),
1871 _ => Err(AvroError::SchemaError(
1873 "ResolvedUnion constructed for non-union sides; resolver should return None"
1874 .to_string(),
1875 )),
1876 }
1877 }
1878
1879 #[inline]
1880 fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
1881 let raw = buf.get_long()?;
1886 if raw < 0 {
1887 return Err(AvroError::ParseError(format!(
1888 "Negative union branch index {raw}"
1889 )));
1890 }
1891 usize::try_from(raw).map_err(|_| {
1892 AvroError::ParseError(format!(
1893 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1894 (usize::BITS as usize)
1895 ))
1896 })
1897 }
1898
1899 #[inline]
1900 fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1901 let branches_len = self.branches.len();
1902 let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1903 return Err(AvroError::ParseError(format!(
1904 "Union branch index {reader_idx} out of range ({branches_len} branches)"
1905 )));
1906 };
1907 self.type_ids.push(self.reader_type_codes[reader_idx]);
1908 self.offsets.push(self.counts[reader_idx]);
1909 self.counts[reader_idx] += 1;
1910 Ok(reader_branch)
1911 }
1912
1913 #[inline]
1914 fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
1915 where
1916 F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
1917 {
1918 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1919 return action(target);
1920 }
1921 let reader_idx = match &self.plan {
1922 UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1923 _ => fallback_idx,
1924 };
1925 self.emit_to(reader_idx).and_then(action)
1926 }
1927
1928 fn append_null(&mut self) -> Result<(), AvroError> {
1929 self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1930 }
1931
1932 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
1933 self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1934 }
1935
1936 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1937 let (reader_idx, promotion) = match &mut self.plan {
1938 UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1939 UnionReadPlan::ReaderUnion { lookup_table } => {
1940 let idx = Self::read_tag(buf)?;
1941 lookup_table.resolve(idx).ok_or_else(|| {
1942 AvroError::ParseError(format!(
1943 "Union branch index {idx} not resolvable by reader schema"
1944 ))
1945 })?
1946 }
1947 UnionReadPlan::FromSingle {
1948 reader_idx,
1949 promotion,
1950 } => (*reader_idx, *promotion),
1951 UnionReadPlan::ToSingle {
1952 target,
1953 lookup_table,
1954 } => {
1955 let idx = Self::read_tag(buf)?;
1956 return match lookup_table.resolve(idx) {
1957 Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1958 None => Err(AvroError::ParseError(format!(
1959 "Writer union branch {idx} does not resolve to reader type"
1960 ))),
1961 };
1962 }
1963 };
1964 let decoder = self.emit_to(reader_idx)?;
1965 decoder.decode_with_promotion(buf, promotion)
1966 }
1967
1968 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1969 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1970 return target.flush(nulls);
1971 }
1972 debug_assert!(
1973 nulls.is_none(),
1974 "UnionArray does not accept a validity bitmap; \
1975 nulls should have been materialized as a Null child during decode"
1976 );
1977 let children = self
1978 .branches
1979 .iter_mut()
1980 .map(|d| d.flush(None))
1981 .collect::<Result<Vec<_>, _>>()?;
1982 let arr = UnionArray::try_new(
1983 self.fields.clone(),
1984 flush_values(&mut self.type_ids).into_iter().collect(),
1985 Some(flush_values(&mut self.offsets).into_iter().collect()),
1986 children,
1987 )
1988 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1989 Ok(Arc::new(arr))
1990 }
1991}
1992
1993#[derive(Debug, Default)]
1994struct UnionDecoderBuilder {
1995 fields: Option<UnionFields>,
1996 branches: Option<Vec<Decoder>>,
1997 resolved: Option<ResolvedUnion>,
1998 target: Option<Box<Decoder>>,
1999}
2000
2001impl UnionDecoderBuilder {
2002 fn new() -> Self {
2003 Self::default()
2004 }
2005
2006 fn with_fields(mut self, fields: UnionFields) -> Self {
2007 self.fields = Some(fields);
2008 self
2009 }
2010
2011 fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
2012 self.branches = Some(branches);
2013 self
2014 }
2015
2016 fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
2017 self.resolved = Some(resolved_union);
2018 self
2019 }
2020
2021 fn with_target(mut self, target: Box<Decoder>) -> Self {
2022 self.target = Some(target);
2023 self
2024 }
2025
2026 fn build(self) -> Result<UnionDecoder, AvroError> {
2027 match (self.resolved, self.fields, self.branches, self.target) {
2028 (resolved, Some(fields), Some(branches), None) => {
2029 UnionDecoder::try_new(fields, branches, resolved)
2030 }
2031 (Some(info), None, None, Some(target))
2032 if info.writer_is_union && !info.reader_is_union =>
2033 {
2034 UnionDecoder::try_new_from_writer_union(info, target)
2035 }
2036 _ => Err(AvroError::InvalidArgument(
2037 "Invalid UnionDecoderBuilder configuration: expected either \
2038 (fields + branches + resolved) with no target for reader-unions, or \
2039 (resolved + target) with no fields/branches for writer-union to single."
2040 .to_string(),
2041 )),
2042 }
2043 }
2044}
2045
2046#[derive(Debug, Copy, Clone)]
2047enum NegativeBlockBehavior {
2048 ProcessItems,
2049 SkipBySize,
2050}
2051
2052#[inline]
2053fn skip_blocks(
2054 buf: &mut AvroCursor,
2055 mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2056) -> Result<usize, AvroError> {
2057 process_blockwise(
2058 buf,
2059 move |c| skip_item(c),
2060 NegativeBlockBehavior::SkipBySize,
2061 )
2062}
2063
2064#[inline]
2065fn flush_dict(
2066 indices: &mut Vec<i32>,
2067 symbols: &[String],
2068 nulls: Option<NullBuffer>,
2069) -> Result<ArrayRef, AvroError> {
2070 let keys = flush_primitive::<Int32Type>(indices, nulls);
2071 let values = Arc::new(StringArray::from_iter_values(
2072 symbols.iter().map(|s| s.as_str()),
2073 ));
2074 DictionaryArray::try_new(keys, values)
2075 .map_err(Into::into)
2076 .map(|arr| Arc::new(arr) as ArrayRef)
2077}
2078
2079#[inline]
2080fn read_blocks(
2081 buf: &mut AvroCursor,
2082 decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2083) -> Result<usize, AvroError> {
2084 process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
2085}
2086
2087#[inline]
2088fn process_blockwise(
2089 buf: &mut AvroCursor,
2090 mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
2091 negative_behavior: NegativeBlockBehavior,
2092) -> Result<usize, AvroError> {
2093 let mut total = 0usize;
2094 loop {
2095 let block_count = buf.get_long()?;
2100 match block_count.cmp(&0) {
2101 Ordering::Equal => break,
2102 Ordering::Less => {
2103 let count = (-block_count) as usize;
2104 let size_in_bytes = buf.get_long()? as usize;
2106 match negative_behavior {
2107 NegativeBlockBehavior::ProcessItems => {
2108 for _ in 0..count {
2110 on_item(buf)?;
2111 }
2112 }
2113 NegativeBlockBehavior::SkipBySize => {
2114 let _ = buf.get_fixed(size_in_bytes)?;
2116 }
2117 }
2118 total += count;
2119 }
2120 Ordering::Greater => {
2121 let count = block_count as usize;
2122 for _ in 0..count {
2123 on_item(buf)?;
2124 }
2125 total += count;
2126 }
2127 }
2128 }
2129 Ok(total)
2130}
2131
2132#[inline]
2133fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
2134 std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
2135}
2136
2137#[inline]
2138fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
2139 std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
2140}
2141
2142#[inline]
2143fn flush_primitive<T: ArrowPrimitiveType>(
2144 values: &mut Vec<T::Native>,
2145 nulls: Option<NullBuffer>,
2146) -> PrimitiveArray<T> {
2147 PrimitiveArray::new(flush_values(values).into(), nulls)
2148}
2149
2150#[inline]
2151fn read_decimal_bytes_be<const N: usize>(
2152 buf: &mut AvroCursor<'_>,
2153 size: &Option<usize>,
2154) -> Result<[u8; N], AvroError> {
2155 match size {
2156 Some(n) if *n == N => {
2157 let raw = buf.get_fixed(N)?;
2158 let mut arr = [0u8; N];
2159 arr.copy_from_slice(raw);
2160 Ok(arr)
2161 }
2162 Some(n) => {
2163 let raw = buf.get_fixed(*n)?;
2164 sign_cast_to::<N>(raw)
2165 }
2166 None => {
2167 let raw = buf.get_bytes()?;
2168 sign_cast_to::<N>(raw)
2169 }
2170 }
2171}
2172
2173#[inline]
2182fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
2183 let len = raw.len();
2184 if len == N {
2186 let mut out = [0u8; N];
2187 out.copy_from_slice(raw);
2188 return Ok(out);
2189 }
2190 let first = raw.first().copied().unwrap_or(0u8);
2192 let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
2193 let mut out = [sign_byte; N];
2195 if len > N {
2196 let extra = len - N;
2199 if raw[..extra].iter().any(|&b| b != sign_byte) {
2201 return Err(AvroError::ParseError(format!(
2202 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2203 len, N
2204 )));
2205 }
2206 if N > 0 {
2207 let first_kept = raw[extra];
2208 let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
2209 if sign_bit_mismatch {
2210 return Err(AvroError::ParseError(format!(
2211 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
2212 len, N
2213 )));
2214 }
2215 }
2216 out.copy_from_slice(&raw[extra..]);
2217 return Ok(out);
2218 }
2219 out[N - len..].copy_from_slice(raw);
2220 Ok(out)
2221}
2222
2223#[cfg(feature = "avro_custom_types")]
2224#[inline]
2225fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
2226 match (arr.is_null(i), arr.is_null(j)) {
2227 (true, true) => true,
2228 (true, false) | (false, true) => false,
2229 (false, false) => {
2230 let a = arr.slice(i, 1);
2231 let b = arr.slice(j, 1);
2232 a == b
2233 }
2234 }
2235}
2236
2237#[derive(Debug)]
2238struct Projector {
2239 writer_to_reader: Arc<[Option<usize>]>,
2240 skip_decoders: Vec<Option<Skipper>>,
2241 field_defaults: Vec<Option<AvroLiteral>>,
2242 default_injections: Arc<[(usize, AvroLiteral)]>,
2243}
2244
2245#[derive(Debug)]
2246struct ProjectorBuilder<'a> {
2247 rec: &'a ResolvedRecord,
2248 reader_fields: Arc<[AvroField]>,
2249}
2250
2251impl<'a> ProjectorBuilder<'a> {
2252 #[inline]
2253 fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
2254 Self {
2255 rec,
2256 reader_fields: reader_fields.clone(),
2257 }
2258 }
2259
2260 #[inline]
2261 fn build(self) -> Result<Projector, AvroError> {
2262 let reader_fields = self.reader_fields;
2263 let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
2264 for avro_field in reader_fields.as_ref() {
2265 if let Some(ResolutionInfo::DefaultValue(lit)) =
2266 avro_field.data_type().resolution.as_ref()
2267 {
2268 field_defaults.push(Some(lit.clone()));
2269 } else {
2270 field_defaults.push(None);
2271 }
2272 }
2273 let mut default_injections: Vec<(usize, AvroLiteral)> =
2274 Vec::with_capacity(self.rec.default_fields.len());
2275 for &idx in self.rec.default_fields.as_ref() {
2276 let lit = field_defaults
2277 .get(idx)
2278 .and_then(|lit| lit.clone())
2279 .unwrap_or(AvroLiteral::Null);
2280 default_injections.push((idx, lit));
2281 }
2282 let mut skip_decoders: Vec<Option<Skipper>> =
2283 Vec::with_capacity(self.rec.skip_fields.len());
2284 for datatype in self.rec.skip_fields.as_ref() {
2285 let skipper = match datatype {
2286 Some(datatype) => Some(Skipper::from_avro(datatype)?),
2287 None => None,
2288 };
2289 skip_decoders.push(skipper);
2290 }
2291 Ok(Projector {
2292 writer_to_reader: self.rec.writer_to_reader.clone(),
2293 skip_decoders,
2294 field_defaults,
2295 default_injections: default_injections.into(),
2296 })
2297 }
2298}
2299
2300impl Projector {
2301 #[inline]
2302 fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), AvroError> {
2303 if let Some(default_literal) = self.field_defaults[index].as_ref() {
2310 decoder.append_default(default_literal)
2311 } else {
2312 decoder.append_null()
2313 }
2314 }
2315
2316 #[inline]
2317 fn project_record(
2318 &mut self,
2319 buf: &mut AvroCursor<'_>,
2320 encodings: &mut [Decoder],
2321 ) -> Result<(), AvroError> {
2322 debug_assert_eq!(
2323 self.writer_to_reader.len(),
2324 self.skip_decoders.len(),
2325 "internal invariant: mapping and skipper lists must have equal length"
2326 );
2327 for (i, (mapping, skipper_opt)) in self
2328 .writer_to_reader
2329 .iter()
2330 .zip(self.skip_decoders.iter_mut())
2331 .enumerate()
2332 {
2333 match (mapping, skipper_opt.as_mut()) {
2334 (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
2335 (None, Some(skipper)) => skipper.skip(buf)?,
2336 (None, None) => {
2337 return Err(AvroError::SchemaError(format!(
2338 "No skipper available for writer-only field at index {i}",
2339 )));
2340 }
2341 }
2342 }
2343 for (reader_index, lit) in self.default_injections.as_ref() {
2344 encodings[*reader_index].append_default(lit)?;
2345 }
2346 Ok(())
2347 }
2348}
2349
2350#[derive(Debug)]
2356enum Skipper {
2357 Null,
2358 Boolean,
2359 Int32,
2360 Int64,
2361 Float32,
2362 Float64,
2363 Bytes,
2364 String,
2365 TimeMicros,
2366 TimestampMillis,
2367 TimestampMicros,
2368 TimestampNanos,
2369 Fixed(usize),
2370 Decimal(Option<usize>),
2371 UuidString,
2372 Enum,
2373 DurationFixed12,
2374 List(Box<Skipper>),
2375 Map(Box<Skipper>),
2376 Struct(Vec<Skipper>),
2377 Union(Vec<Skipper>),
2378 Nullable(Nullability, Box<Skipper>),
2379 #[cfg(feature = "avro_custom_types")]
2380 RunEndEncoded(Box<Skipper>),
2381}
2382
2383impl Skipper {
2384 fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
2385 let mut base = match dt.codec() {
2386 Codec::Null => Self::Null,
2387 Codec::Boolean => Self::Boolean,
2388 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
2389 Codec::Int64 => Self::Int64,
2390 Codec::TimeMicros => Self::TimeMicros,
2391 Codec::TimestampMillis(_) => Self::TimestampMillis,
2392 Codec::TimestampMicros(_) => Self::TimestampMicros,
2393 Codec::TimestampNanos(_) => Self::TimestampNanos,
2394 #[cfg(feature = "avro_custom_types")]
2395 Codec::DurationNanos
2396 | Codec::DurationMicros
2397 | Codec::DurationMillis
2398 | Codec::DurationSeconds => Self::Int64,
2399 #[cfg(feature = "avro_custom_types")]
2400 Codec::Int8 | Codec::Int16 | Codec::UInt8 | Codec::UInt16 | Codec::Time32Secs => {
2401 Self::Int32
2402 }
2403 #[cfg(feature = "avro_custom_types")]
2404 Codec::UInt32 | Codec::Date64 | Codec::TimeNanos | Codec::TimestampSecs(_) => {
2405 Self::Int64
2406 }
2407 #[cfg(feature = "avro_custom_types")]
2408 Codec::UInt64 => Self::Fixed(8),
2409 #[cfg(feature = "avro_custom_types")]
2410 Codec::Float16 => Self::Fixed(2),
2411 #[cfg(feature = "avro_custom_types")]
2412 Codec::IntervalYearMonth => Self::Fixed(4),
2413 #[cfg(feature = "avro_custom_types")]
2414 Codec::IntervalMonthDayNano => Self::Fixed(16),
2415 #[cfg(feature = "avro_custom_types")]
2416 Codec::IntervalDayTime => Self::Fixed(8),
2417 Codec::Float32 => Self::Float32,
2418 Codec::Float64 => Self::Float64,
2419 Codec::Binary => Self::Bytes,
2420 Codec::Utf8 | Codec::Utf8View => Self::String,
2421 Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2422 Codec::Decimal(_, _, size) => Self::Decimal(*size),
2423 Codec::Uuid => Self::UuidString, Codec::Enum(_) => Self::Enum,
2425 Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2426 Codec::Struct(fields) => Self::Struct(
2427 fields
2428 .iter()
2429 .map(|f| Skipper::from_avro(f.data_type()))
2430 .collect::<Result<_, _>>()?,
2431 ),
2432 Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2433 Codec::Interval => Self::DurationFixed12,
2434 Codec::Union(encodings, _, _) => {
2435 let max_addr = (i32::MAX as usize) + 1;
2436 if encodings.len() > max_addr {
2437 return Err(AvroError::SchemaError(format!(
2438 "Writer union has {} branches, which exceeds the maximum addressable \
2439 branches by an Avro int tag ({} + 1).",
2440 encodings.len(),
2441 i32::MAX
2442 )));
2443 }
2444 Self::Union(
2445 encodings
2446 .iter()
2447 .map(Skipper::from_avro)
2448 .collect::<Result<_, _>>()?,
2449 )
2450 }
2451 #[cfg(feature = "avro_custom_types")]
2452 Codec::RunEndEncoded(inner, _w) => {
2453 Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2454 }
2455 };
2456 if let Some(n) = dt.nullability() {
2457 base = Self::Nullable(n, Box::new(base));
2458 }
2459 Ok(base)
2460 }
2461
2462 fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2463 match self {
2464 Self::Null => Ok(()),
2465 Self::Boolean => {
2466 buf.get_bool()?;
2467 Ok(())
2468 }
2469 Self::Int32 => {
2470 buf.skip_int()?;
2471 Ok(())
2472 }
2473 Self::Int64
2474 | Self::TimeMicros
2475 | Self::TimestampMillis
2476 | Self::TimestampMicros
2477 | Self::TimestampNanos => {
2478 buf.skip_long()?;
2479 Ok(())
2480 }
2481 Self::Float32 => {
2482 buf.get_float()?;
2483 Ok(())
2484 }
2485 Self::Float64 => {
2486 buf.get_double()?;
2487 Ok(())
2488 }
2489 Self::Bytes | Self::String | Self::UuidString => {
2490 buf.get_bytes()?;
2491 Ok(())
2492 }
2493 Self::Fixed(sz) => {
2494 buf.get_fixed(*sz)?;
2495 Ok(())
2496 }
2497 Self::Decimal(size) => {
2498 if let Some(s) = size {
2499 buf.get_fixed(*s)
2500 } else {
2501 buf.get_bytes()
2502 }?;
2503 Ok(())
2504 }
2505 Self::Enum => {
2506 buf.skip_int()?;
2507 Ok(())
2508 }
2509 Self::DurationFixed12 => {
2510 buf.get_fixed(12)?;
2511 Ok(())
2512 }
2513 Self::List(item) => {
2514 skip_blocks(buf, |c| item.skip(c))?;
2515 Ok(())
2516 }
2517 Self::Map(value) => {
2518 skip_blocks(buf, |c| {
2519 c.get_bytes()?; value.skip(c)
2521 })?;
2522 Ok(())
2523 }
2524 Self::Struct(fields) => {
2525 for f in fields.iter_mut() {
2526 f.skip(buf)?
2527 }
2528 Ok(())
2529 }
2530 Self::Union(encodings) => {
2531 let raw = buf.get_long()?;
2533 if raw < 0 {
2534 return Err(AvroError::ParseError(format!(
2535 "Negative union branch index {raw}"
2536 )));
2537 }
2538 let idx: usize = usize::try_from(raw).map_err(|_| {
2539 AvroError::ParseError(format!(
2540 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2541 (usize::BITS as usize)
2542 ))
2543 })?;
2544 let Some(encoding) = encodings.get_mut(idx) else {
2545 return Err(AvroError::ParseError(format!(
2546 "Union branch index {idx} out of range for skipper ({} branches)",
2547 encodings.len()
2548 )));
2549 };
2550 encoding.skip(buf)
2551 }
2552 Self::Nullable(order, inner) => {
2553 let branch = buf.read_vlq()?;
2554 let is_not_null = match *order {
2555 Nullability::NullFirst => branch != 0,
2556 Nullability::NullSecond => branch == 0,
2557 };
2558 if is_not_null {
2559 inner.skip(buf)?;
2560 }
2561 Ok(())
2562 }
2563 #[cfg(feature = "avro_custom_types")]
2564 Self::RunEndEncoded(inner) => inner.skip(buf),
2565 }
2566 }
2567}
2568
2569#[cfg(test)]
2570mod tests {
2571 use super::*;
2572 use crate::codec::AvroFieldBuilder;
2573 use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2574 use arrow_array::cast::AsArray;
2575 use indexmap::IndexMap;
2576 use std::collections::HashMap;
2577
2578 fn encode_avro_int(value: i32) -> Vec<u8> {
2579 let mut buf = Vec::new();
2580 let mut v = (value << 1) ^ (value >> 31);
2581 while v & !0x7F != 0 {
2582 buf.push(((v & 0x7F) | 0x80) as u8);
2583 v >>= 7;
2584 }
2585 buf.push(v as u8);
2586 buf
2587 }
2588
2589 fn encode_avro_long(value: i64) -> Vec<u8> {
2590 let mut buf = Vec::new();
2591 let mut v = (value << 1) ^ (value >> 63);
2592 while v & !0x7F != 0 {
2593 buf.push(((v & 0x7F) | 0x80) as u8);
2594 v >>= 7;
2595 }
2596 buf.push(v as u8);
2597 buf
2598 }
2599
2600 fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2601 let mut buf = encode_avro_long(bytes.len() as i64);
2602 buf.extend_from_slice(bytes);
2603 buf
2604 }
2605
2606 fn avro_from_codec(codec: Codec) -> AvroDataType {
2607 AvroDataType::new(codec, Default::default(), None)
2608 }
2609
2610 fn resolved_root_datatype(
2611 writer: Schema<'static>,
2612 reader: Schema<'static>,
2613 use_utf8view: bool,
2614 strict_mode: bool,
2615 ) -> AvroDataType {
2616 let writer_record = Schema::Complex(ComplexType::Record(Record {
2618 name: "Root",
2619 namespace: None,
2620 doc: None,
2621 aliases: vec![],
2622 fields: vec![Field {
2623 name: "v",
2624 r#type: writer,
2625 default: None,
2626 doc: None,
2627 aliases: vec![],
2628 }],
2629 attributes: Attributes::default(),
2630 }));
2631
2632 let reader_record = Schema::Complex(ComplexType::Record(Record {
2634 name: "Root",
2635 namespace: None,
2636 doc: None,
2637 aliases: vec![],
2638 fields: vec![Field {
2639 name: "v",
2640 r#type: reader,
2641 default: None,
2642 doc: None,
2643 aliases: vec![],
2644 }],
2645 attributes: Attributes::default(),
2646 }));
2647
2648 let field = AvroFieldBuilder::new(&writer_record)
2650 .with_reader_schema(&reader_record)
2651 .with_utf8view(use_utf8view)
2652 .with_strict_mode(strict_mode)
2653 .build()
2654 .expect("schema resolution should succeed");
2655
2656 match field.data_type().codec() {
2657 Codec::Struct(fields) => fields[0].data_type().clone(),
2658 other => panic!("expected wrapper struct, got {other:?}"),
2659 }
2660 }
2661
2662 fn decoder_for_promotion(
2663 writer: PrimitiveType,
2664 reader: PrimitiveType,
2665 use_utf8view: bool,
2666 ) -> Decoder {
2667 let ws = Schema::TypeName(TypeName::Primitive(writer));
2668 let rs = Schema::TypeName(TypeName::Primitive(reader));
2669 let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2670 Decoder::try_new(&dt).unwrap()
2671 }
2672
2673 fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2674 AvroDataType::new(codec, HashMap::new(), nullability)
2675 }
2676
2677 #[cfg(feature = "avro_custom_types")]
2678 fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2679 let mut out = Vec::with_capacity(10);
2680 while x >= 0x80 {
2681 out.push((x as u8) | 0x80);
2682 x >>= 7;
2683 }
2684 out.push(x as u8);
2685 out
2686 }
2687
2688 #[test]
2689 fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2690 let ws = Schema::Union(vec![
2691 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2692 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2693 ]);
2694 let rs = Schema::Union(vec![
2695 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2696 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2697 ]);
2698
2699 let dt = resolved_root_datatype(ws, rs, false, false);
2700 let mut dec = Decoder::try_new(&dt).unwrap();
2701
2702 let mut rec1 = encode_avro_long(0);
2703 rec1.extend(encode_avro_int(7));
2704 let mut cur1 = AvroCursor::new(&rec1);
2705 dec.decode(&mut cur1).unwrap();
2706
2707 let mut rec2 = encode_avro_long(1);
2708 rec2.extend(encode_avro_bytes("abc".as_bytes()));
2709 let mut cur2 = AvroCursor::new(&rec2);
2710 dec.decode(&mut cur2).unwrap();
2711
2712 let arr = dec.flush(None).unwrap();
2713 let ua = arr
2714 .as_any()
2715 .downcast_ref::<UnionArray>()
2716 .expect("dense union output");
2717
2718 assert_eq!(
2719 ua.type_id(0),
2720 1,
2721 "first value must select reader 'long' branch"
2722 );
2723 assert_eq!(ua.value_offset(0), 0);
2724
2725 assert_eq!(
2726 ua.type_id(1),
2727 0,
2728 "second value must select reader 'string' branch"
2729 );
2730 assert_eq!(ua.value_offset(1), 0);
2731
2732 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2733 assert_eq!(long_child.len(), 1);
2734 assert_eq!(long_child.value(0), 7);
2735
2736 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2737 assert_eq!(str_child.len(), 1);
2738 assert_eq!(str_child.value(0), "abc");
2739 }
2740
2741 #[test]
2742 fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2743 let ws = Schema::Union(vec![
2744 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2745 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2746 ]);
2747 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2748
2749 let dt = resolved_root_datatype(ws, rs, false, false);
2750 let mut dec = Decoder::try_new(&dt).unwrap();
2751
2752 let mut data = encode_avro_long(0);
2753 data.extend(encode_avro_int(5));
2754 let mut cur = AvroCursor::new(&data);
2755 dec.decode(&mut cur).unwrap();
2756
2757 let arr = dec.flush(None).unwrap();
2758 let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2759 assert_eq!(out.len(), 1);
2760 assert_eq!(out.value(0), 5);
2761 }
2762
2763 #[test]
2764 fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2765 let ws = Schema::Union(vec![
2766 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2767 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2768 ]);
2769 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2770
2771 let dt = resolved_root_datatype(ws, rs, false, false);
2772 let mut dec = Decoder::try_new(&dt).unwrap();
2773
2774 let mut data = encode_avro_long(1);
2775 data.extend(encode_avro_bytes("z".as_bytes()));
2776 let mut cur = AvroCursor::new(&data);
2777 let res = dec.decode(&mut cur);
2778 assert!(
2779 res.is_err(),
2780 "expected error when writer union branch does not resolve to reader non-union type"
2781 );
2782 }
2783
2784 #[test]
2785 fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2786 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2787 let rs = Schema::Union(vec![
2788 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2789 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2790 ]);
2791
2792 let dt = resolved_root_datatype(ws, rs, false, false);
2793 let mut dec = Decoder::try_new(&dt).unwrap();
2794
2795 let data = encode_avro_int(6);
2796 let mut cur = AvroCursor::new(&data);
2797 dec.decode(&mut cur).unwrap();
2798
2799 let arr = dec.flush(None).unwrap();
2800 let ua = arr
2801 .as_any()
2802 .downcast_ref::<UnionArray>()
2803 .expect("dense union output");
2804 assert_eq!(ua.len(), 1);
2805 assert_eq!(
2806 ua.type_id(0),
2807 1,
2808 "must resolve to reader 'long' branch (type_id 1)"
2809 );
2810 assert_eq!(ua.value_offset(0), 0);
2811
2812 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2813 assert_eq!(long_child.len(), 1);
2814 assert_eq!(long_child.value(0), 6);
2815
2816 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2817 assert_eq!(str_child.len(), 0, "string branch must be empty");
2818 }
2819
2820 #[test]
2821 fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2822 let ws = Schema::Union(vec![
2823 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2824 Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2825 ]);
2826 let rs = Schema::Union(vec![
2827 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2828 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2829 ]);
2830
2831 let dt = resolved_root_datatype(ws, rs, false, false);
2832 let mut dec = Decoder::try_new(&dt).unwrap();
2833
2834 let mut data = encode_avro_long(1);
2835 data.push(1);
2836 let mut cur = AvroCursor::new(&data);
2837 let res = dec.decode(&mut cur);
2838 assert!(
2839 res.is_err(),
2840 "expected error for unmapped writer 'boolean' branch"
2841 );
2842 }
2843
2844 #[test]
2845 fn test_schema_resolution_promotion_int_to_long() {
2846 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2847 assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2848 for v in [0, 1, -2, 123456] {
2849 let data = encode_avro_int(v);
2850 let mut cur = AvroCursor::new(&data);
2851 dec.decode(&mut cur).unwrap();
2852 }
2853 let arr = dec.flush(None).unwrap();
2854 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2855 assert_eq!(a.value(0), 0);
2856 assert_eq!(a.value(1), 1);
2857 assert_eq!(a.value(2), -2);
2858 assert_eq!(a.value(3), 123456);
2859 }
2860
2861 #[test]
2862 fn test_schema_resolution_promotion_int_to_float() {
2863 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2864 assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2865 for v in [0, 42, -7] {
2866 let data = encode_avro_int(v);
2867 let mut cur = AvroCursor::new(&data);
2868 dec.decode(&mut cur).unwrap();
2869 }
2870 let arr = dec.flush(None).unwrap();
2871 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2872 assert_eq!(a.value(0), 0.0);
2873 assert_eq!(a.value(1), 42.0);
2874 assert_eq!(a.value(2), -7.0);
2875 }
2876
2877 #[test]
2878 fn test_schema_resolution_promotion_int_to_double() {
2879 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2880 assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2881 for v in [1, -1, 10_000] {
2882 let data = encode_avro_int(v);
2883 let mut cur = AvroCursor::new(&data);
2884 dec.decode(&mut cur).unwrap();
2885 }
2886 let arr = dec.flush(None).unwrap();
2887 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2888 assert_eq!(a.value(0), 1.0);
2889 assert_eq!(a.value(1), -1.0);
2890 assert_eq!(a.value(2), 10_000.0);
2891 }
2892
2893 #[test]
2894 fn test_schema_resolution_promotion_long_to_float() {
2895 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2896 assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2897 for v in [0_i64, 1_000_000_i64, -123_i64] {
2898 let data = encode_avro_long(v);
2899 let mut cur = AvroCursor::new(&data);
2900 dec.decode(&mut cur).unwrap();
2901 }
2902 let arr = dec.flush(None).unwrap();
2903 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2904 assert_eq!(a.value(0), 0.0);
2905 assert_eq!(a.value(1), 1_000_000.0);
2906 assert_eq!(a.value(2), -123.0);
2907 }
2908
2909 #[test]
2910 fn test_schema_resolution_promotion_long_to_double() {
2911 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2912 assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2913 for v in [2_i64, -2_i64, 9_223_372_i64] {
2914 let data = encode_avro_long(v);
2915 let mut cur = AvroCursor::new(&data);
2916 dec.decode(&mut cur).unwrap();
2917 }
2918 let arr = dec.flush(None).unwrap();
2919 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2920 assert_eq!(a.value(0), 2.0);
2921 assert_eq!(a.value(1), -2.0);
2922 assert_eq!(a.value(2), 9_223_372.0);
2923 }
2924
2925 #[test]
2926 fn test_schema_resolution_promotion_float_to_double() {
2927 let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2928 assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2929 for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2930 let data = v.to_le_bytes().to_vec();
2931 let mut cur = AvroCursor::new(&data);
2932 dec.decode(&mut cur).unwrap();
2933 }
2934 let arr = dec.flush(None).unwrap();
2935 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2936 assert_eq!(a.value(0), 0.5_f64);
2937 assert_eq!(a.value(1), -3.25_f64);
2938 assert_eq!(a.value(2), 1.0e6_f64);
2939 }
2940
2941 #[test]
2942 fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2943 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2944 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2945 for s in ["hello", "world", "héllo"] {
2946 let data = encode_avro_bytes(s.as_bytes());
2947 let mut cur = AvroCursor::new(&data);
2948 dec.decode(&mut cur).unwrap();
2949 }
2950 let arr = dec.flush(None).unwrap();
2951 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2952 assert_eq!(a.value(0), "hello");
2953 assert_eq!(a.value(1), "world");
2954 assert_eq!(a.value(2), "héllo");
2955 }
2956
2957 #[test]
2958 fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2959 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2960 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2961 let data = encode_avro_bytes("abc".as_bytes());
2962 let mut cur = AvroCursor::new(&data);
2963 dec.decode(&mut cur).unwrap();
2964 let arr = dec.flush(None).unwrap();
2965 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2966 assert_eq!(a.value(0), "abc");
2967 }
2968
2969 #[test]
2970 fn test_schema_resolution_promotion_string_to_bytes() {
2971 let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2972 assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2973 for s in ["", "abc", "data"] {
2974 let data = encode_avro_bytes(s.as_bytes());
2975 let mut cur = AvroCursor::new(&data);
2976 dec.decode(&mut cur).unwrap();
2977 }
2978 let arr = dec.flush(None).unwrap();
2979 let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2980 assert_eq!(a.value(0), b"");
2981 assert_eq!(a.value(1), b"abc");
2982 assert_eq!(a.value(2), "data".as_bytes());
2983 }
2984
2985 #[test]
2986 fn test_schema_resolution_no_promotion_passthrough_int() {
2987 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2988 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2989 let writer_record = Schema::Complex(ComplexType::Record(Record {
2991 name: "Root",
2992 namespace: None,
2993 doc: None,
2994 aliases: vec![],
2995 fields: vec![Field {
2996 name: "v",
2997 r#type: ws,
2998 default: None,
2999 doc: None,
3000 aliases: vec![],
3001 }],
3002 attributes: Attributes::default(),
3003 }));
3004 let reader_record = Schema::Complex(ComplexType::Record(Record {
3005 name: "Root",
3006 namespace: None,
3007 doc: None,
3008 aliases: vec![],
3009 fields: vec![Field {
3010 name: "v",
3011 r#type: rs,
3012 default: None,
3013 doc: None,
3014 aliases: vec![],
3015 }],
3016 attributes: Attributes::default(),
3017 }));
3018 let field = AvroFieldBuilder::new(&writer_record)
3019 .with_reader_schema(&reader_record)
3020 .with_utf8view(false)
3021 .with_strict_mode(false)
3022 .build()
3023 .unwrap();
3024 let dt = match field.data_type().codec() {
3026 Codec::Struct(fields) => fields[0].data_type().clone(),
3027 other => panic!("expected wrapper struct, got {other:?}"),
3028 };
3029 let mut dec = Decoder::try_new(&dt).unwrap();
3030 assert!(matches!(dec, Decoder::Int32(_)));
3031 for v in [7, -9] {
3032 let data = encode_avro_int(v);
3033 let mut cur = AvroCursor::new(&data);
3034 dec.decode(&mut cur).unwrap();
3035 }
3036 let arr = dec.flush(None).unwrap();
3037 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3038 assert_eq!(a.value(0), 7);
3039 assert_eq!(a.value(1), -9);
3040 }
3041
3042 #[test]
3043 fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
3044 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
3045 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
3046 let writer_record = Schema::Complex(ComplexType::Record(Record {
3047 name: "Root",
3048 namespace: None,
3049 doc: None,
3050 aliases: vec![],
3051 fields: vec![Field {
3052 name: "v",
3053 r#type: ws,
3054 default: None,
3055 doc: None,
3056 aliases: vec![],
3057 }],
3058 attributes: Attributes::default(),
3059 }));
3060 let reader_record = Schema::Complex(ComplexType::Record(Record {
3061 name: "Root",
3062 namespace: None,
3063 doc: None,
3064 aliases: vec![],
3065 fields: vec![Field {
3066 name: "v",
3067 r#type: rs,
3068 default: None,
3069 doc: None,
3070 aliases: vec![],
3071 }],
3072 attributes: Attributes::default(),
3073 }));
3074 let res = AvroFieldBuilder::new(&writer_record)
3075 .with_reader_schema(&reader_record)
3076 .with_utf8view(false)
3077 .with_strict_mode(false)
3078 .build();
3079 assert!(res.is_err(), "expected error for illegal promotion");
3080 }
3081
3082 #[test]
3083 fn test_map_decoding_one_entry() {
3084 let value_type = avro_from_codec(Codec::Utf8);
3085 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3086 let mut decoder = Decoder::try_new(&map_type).unwrap();
3087 let mut data = Vec::new();
3089 data.extend_from_slice(&encode_avro_long(1));
3090 data.extend_from_slice(&encode_avro_bytes(b"hello")); data.extend_from_slice(&encode_avro_bytes(b"world")); data.extend_from_slice(&encode_avro_long(0));
3093 let mut cursor = AvroCursor::new(&data);
3094 decoder.decode(&mut cursor).unwrap();
3095 let array = decoder.flush(None).unwrap();
3096 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3097 assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 1);
3099 let entries = map_arr.value(0);
3100 let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
3101 assert_eq!(struct_entries.len(), 1);
3102 let key_arr = struct_entries
3103 .column_by_name("key")
3104 .unwrap()
3105 .as_any()
3106 .downcast_ref::<StringArray>()
3107 .unwrap();
3108 let val_arr = struct_entries
3109 .column_by_name("value")
3110 .unwrap()
3111 .as_any()
3112 .downcast_ref::<StringArray>()
3113 .unwrap();
3114 assert_eq!(key_arr.value(0), "hello");
3115 assert_eq!(val_arr.value(0), "world");
3116 }
3117
3118 #[test]
3119 fn test_map_decoding_empty() {
3120 let value_type = avro_from_codec(Codec::Utf8);
3121 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
3122 let mut decoder = Decoder::try_new(&map_type).unwrap();
3123 let data = encode_avro_long(0);
3124 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3125 let array = decoder.flush(None).unwrap();
3126 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
3127 assert_eq!(map_arr.len(), 1);
3128 assert_eq!(map_arr.value_length(0), 0);
3129 }
3130
3131 #[test]
3132 fn test_fixed_decoding() {
3133 let avro_type = avro_from_codec(Codec::Fixed(3));
3134 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3135
3136 let data1 = [1u8, 2, 3];
3137 let mut cursor1 = AvroCursor::new(&data1);
3138 decoder
3139 .decode(&mut cursor1)
3140 .expect("Failed to decode data1");
3141 assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
3142 let data2 = [4u8, 5, 6];
3143 let mut cursor2 = AvroCursor::new(&data2);
3144 decoder
3145 .decode(&mut cursor2)
3146 .expect("Failed to decode data2");
3147 assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
3148 let array = decoder.flush(None).expect("Failed to flush decoder");
3149 assert_eq!(array.len(), 2, "Array should contain two items");
3150 let fixed_size_binary_array = array
3151 .as_any()
3152 .downcast_ref::<FixedSizeBinaryArray>()
3153 .expect("Failed to downcast to FixedSizeBinaryArray");
3154 assert_eq!(
3155 fixed_size_binary_array.value_length(),
3156 3,
3157 "Fixed size of binary values should be 3"
3158 );
3159 assert_eq!(
3160 fixed_size_binary_array.value(0),
3161 &[1, 2, 3],
3162 "First item mismatch"
3163 );
3164 assert_eq!(
3165 fixed_size_binary_array.value(1),
3166 &[4, 5, 6],
3167 "Second item mismatch"
3168 );
3169 }
3170
3171 #[test]
3172 fn test_fixed_decoding_empty() {
3173 let avro_type = avro_from_codec(Codec::Fixed(5));
3174 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3175
3176 let array = decoder
3177 .flush(None)
3178 .expect("Failed to flush decoder for empty input");
3179
3180 assert_eq!(array.len(), 0, "Array should be empty");
3181 let fixed_size_binary_array = array
3182 .as_any()
3183 .downcast_ref::<FixedSizeBinaryArray>()
3184 .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
3185
3186 assert_eq!(
3187 fixed_size_binary_array.value_length(),
3188 5,
3189 "Fixed size of binary values should be 5 as per type"
3190 );
3191 }
3192
3193 #[test]
3194 fn test_uuid_decoding() {
3195 let avro_type = avro_from_codec(Codec::Uuid);
3196 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
3197 let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
3198 let data = encode_avro_bytes(uuid_str.as_bytes());
3199 let mut cursor = AvroCursor::new(&data);
3200 decoder.decode(&mut cursor).expect("Failed to decode data");
3201 assert_eq!(
3202 cursor.position(),
3203 data.len(),
3204 "Cursor should advance by varint size + data size"
3205 );
3206 let array = decoder.flush(None).expect("Failed to flush decoder");
3207 let fixed_size_binary_array = array
3208 .as_any()
3209 .downcast_ref::<FixedSizeBinaryArray>()
3210 .expect("Array should be a FixedSizeBinaryArray");
3211 assert_eq!(fixed_size_binary_array.len(), 1);
3212 assert_eq!(fixed_size_binary_array.value_length(), 16);
3213 let expected_bytes = [
3214 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
3215 0x6b, 0xf6,
3216 ];
3217 assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
3218 }
3219
3220 #[test]
3221 fn test_array_decoding() {
3222 let item_dt = avro_from_codec(Codec::Int32);
3223 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3224 let mut decoder = Decoder::try_new(&list_dt).unwrap();
3225 let mut row1 = Vec::new();
3226 row1.extend_from_slice(&encode_avro_long(2));
3227 row1.extend_from_slice(&encode_avro_int(10));
3228 row1.extend_from_slice(&encode_avro_int(20));
3229 row1.extend_from_slice(&encode_avro_long(0));
3230 let row2 = encode_avro_long(0);
3231 let mut cursor = AvroCursor::new(&row1);
3232 decoder.decode(&mut cursor).unwrap();
3233 let mut cursor2 = AvroCursor::new(&row2);
3234 decoder.decode(&mut cursor2).unwrap();
3235 let array = decoder.flush(None).unwrap();
3236 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3237 assert_eq!(list_arr.len(), 2);
3238 let offsets = list_arr.value_offsets();
3239 assert_eq!(offsets, &[0, 2, 2]);
3240 let values = list_arr.values();
3241 let int_arr = values.as_primitive::<Int32Type>();
3242 assert_eq!(int_arr.len(), 2);
3243 assert_eq!(int_arr.value(0), 10);
3244 assert_eq!(int_arr.value(1), 20);
3245 }
3246
3247 #[test]
3248 fn test_array_decoding_with_negative_block_count() {
3249 let item_dt = avro_from_codec(Codec::Int32);
3250 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
3251 let mut decoder = Decoder::try_new(&list_dt).unwrap();
3252 let mut data = encode_avro_long(-3);
3253 data.extend_from_slice(&encode_avro_long(12));
3254 data.extend_from_slice(&encode_avro_int(1));
3255 data.extend_from_slice(&encode_avro_int(2));
3256 data.extend_from_slice(&encode_avro_int(3));
3257 data.extend_from_slice(&encode_avro_long(0));
3258 let mut cursor = AvroCursor::new(&data);
3259 decoder.decode(&mut cursor).unwrap();
3260 let array = decoder.flush(None).unwrap();
3261 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3262 assert_eq!(list_arr.len(), 1);
3263 assert_eq!(list_arr.value_length(0), 3);
3264 let values = list_arr.values().as_primitive::<Int32Type>();
3265 assert_eq!(values.len(), 3);
3266 assert_eq!(values.value(0), 1);
3267 assert_eq!(values.value(1), 2);
3268 assert_eq!(values.value(2), 3);
3269 }
3270
3271 #[test]
3272 fn test_nested_array_decoding() {
3273 let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3274 let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
3275 let mut decoder = Decoder::try_new(&nested_ty).unwrap();
3276 let mut buf = Vec::new();
3277 buf.extend(encode_avro_long(1));
3278 buf.extend(encode_avro_long(2));
3279 buf.extend(encode_avro_int(5));
3280 buf.extend(encode_avro_int(6));
3281 buf.extend(encode_avro_long(0));
3282 buf.extend(encode_avro_long(0));
3283 let mut cursor = AvroCursor::new(&buf);
3284 decoder.decode(&mut cursor).unwrap();
3285 let arr = decoder.flush(None).unwrap();
3286 let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
3287 assert_eq!(outer.len(), 1);
3288 assert_eq!(outer.value_length(0), 1);
3289 let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
3290 assert_eq!(inner.len(), 1);
3291 assert_eq!(inner.value_length(0), 2);
3292 let values = inner
3293 .values()
3294 .as_any()
3295 .downcast_ref::<Int32Array>()
3296 .unwrap();
3297 assert_eq!(values.values(), &[5, 6]);
3298 }
3299
3300 #[test]
3301 fn test_array_decoding_empty_array() {
3302 let value_type = avro_from_codec(Codec::Utf8);
3303 let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
3304 let mut decoder = Decoder::try_new(&map_type).unwrap();
3305 let data = encode_avro_long(0);
3306 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
3307 let array = decoder.flush(None).unwrap();
3308 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3309 assert_eq!(list_arr.len(), 1);
3310 assert_eq!(list_arr.value_length(0), 0);
3311 }
3312
3313 #[test]
3314 fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
3315 use crate::schema::Array;
3316 let writer_schema = Schema::Complex(ComplexType::Array(Array {
3317 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
3318 attributes: Attributes::default(),
3319 }));
3320 let reader_schema = Schema::Complex(ComplexType::Array(Array {
3321 items: Box::new(Schema::Union(vec![
3322 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
3323 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
3324 ])),
3325 attributes: Attributes::default(),
3326 }));
3327 let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
3328 if let Codec::List(inner) = dt.codec() {
3329 assert_eq!(
3330 inner.nullability(),
3331 Some(Nullability::NullFirst),
3332 "items should be nullable"
3333 );
3334 } else {
3335 panic!("expected List codec");
3336 }
3337 let mut decoder = Decoder::try_new(&dt).unwrap();
3338 let mut data = encode_avro_long(2);
3339 data.extend(encode_avro_int(10));
3340 data.extend(encode_avro_int(20));
3341 data.extend(encode_avro_long(0));
3342 let mut cursor = AvroCursor::new(&data);
3343 decoder.decode(&mut cursor).unwrap();
3344 assert_eq!(
3345 cursor.position(),
3346 data.len(),
3347 "all bytes should be consumed"
3348 );
3349 let array = decoder.flush(None).unwrap();
3350 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
3351 assert_eq!(list_arr.len(), 1, "one list/row");
3352 assert_eq!(list_arr.value_length(0), 2, "two items in the list");
3353 let values = list_arr.values().as_primitive::<Int32Type>();
3354 assert_eq!(values.len(), 2);
3355 assert_eq!(values.value(0), 10);
3356 assert_eq!(values.value(1), 20);
3357 assert!(!values.is_null(0));
3358 assert!(!values.is_null(1));
3359 }
3360
3361 #[test]
3362 fn test_decimal_decoding_fixed256() {
3363 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3364 let mut decoder = Decoder::try_new(&dt).unwrap();
3365 let row1 = [
3366 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3367 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3368 0x00, 0x00, 0x30, 0x39,
3369 ];
3370 let row2 = [
3371 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3372 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3373 0xFF, 0xFF, 0xFF, 0x85,
3374 ];
3375 let mut data = Vec::new();
3376 data.extend_from_slice(&row1);
3377 data.extend_from_slice(&row2);
3378 let mut cursor = AvroCursor::new(&data);
3379 decoder.decode(&mut cursor).unwrap();
3380 decoder.decode(&mut cursor).unwrap();
3381 let arr = decoder.flush(None).unwrap();
3382 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3383 assert_eq!(dec.len(), 2);
3384 assert_eq!(dec.value_as_string(0), "123.45");
3385 assert_eq!(dec.value_as_string(1), "-1.23");
3386 }
3387
3388 #[test]
3389 fn test_decimal_decoding_fixed128() {
3390 let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
3391 let mut decoder = Decoder::try_new(&dt).unwrap();
3392 let row1 = [
3393 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3394 0x30, 0x39,
3395 ];
3396 let row2 = [
3397 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3398 0xFF, 0x85,
3399 ];
3400 let mut data = Vec::new();
3401 data.extend_from_slice(&row1);
3402 data.extend_from_slice(&row2);
3403 let mut cursor = AvroCursor::new(&data);
3404 decoder.decode(&mut cursor).unwrap();
3405 decoder.decode(&mut cursor).unwrap();
3406 let arr = decoder.flush(None).unwrap();
3407 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3408 assert_eq!(dec.len(), 2);
3409 assert_eq!(dec.value_as_string(0), "123.45");
3410 assert_eq!(dec.value_as_string(1), "-1.23");
3411 }
3412
3413 #[test]
3414 fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3415 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3416 let mut decoder = Decoder::try_new(&dt).unwrap();
3417 let row1 = [
3418 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3419 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3420 0x00, 0x00, 0x30, 0x39,
3421 ];
3422 let row2 = [
3423 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3424 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3425 0xFF, 0xFF, 0xFF, 0x85,
3426 ];
3427 let mut data = Vec::new();
3428 data.extend_from_slice(&row1);
3429 data.extend_from_slice(&row2);
3430 let mut cursor = AvroCursor::new(&data);
3431 decoder.decode(&mut cursor).unwrap();
3432 decoder.decode(&mut cursor).unwrap();
3433 let arr = decoder.flush(None).unwrap();
3434 #[cfg(feature = "small_decimals")]
3435 {
3436 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3437 assert_eq!(dec.len(), 2);
3438 assert_eq!(dec.value_as_string(0), "123.45");
3439 assert_eq!(dec.value_as_string(1), "-1.23");
3440 }
3441 #[cfg(not(feature = "small_decimals"))]
3442 {
3443 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3444 assert_eq!(dec.len(), 2);
3445 assert_eq!(dec.value_as_string(0), "123.45");
3446 assert_eq!(dec.value_as_string(1), "-1.23");
3447 }
3448 }
3449
3450 #[test]
3451 fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3452 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3453 let mut decoder = Decoder::try_new(&dt).unwrap();
3454 let row1 = [
3455 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3456 0x30, 0x39,
3457 ];
3458 let row2 = [
3459 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3460 0xFF, 0x85,
3461 ];
3462 let mut data = Vec::new();
3463 data.extend_from_slice(&row1);
3464 data.extend_from_slice(&row2);
3465 let mut cursor = AvroCursor::new(&data);
3466 decoder.decode(&mut cursor).unwrap();
3467 decoder.decode(&mut cursor).unwrap();
3468
3469 let arr = decoder.flush(None).unwrap();
3470 #[cfg(feature = "small_decimals")]
3471 {
3472 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3473 assert_eq!(dec.len(), 2);
3474 assert_eq!(dec.value_as_string(0), "123.45");
3475 assert_eq!(dec.value_as_string(1), "-1.23");
3476 }
3477 #[cfg(not(feature = "small_decimals"))]
3478 {
3479 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3480 assert_eq!(dec.len(), 2);
3481 assert_eq!(dec.value_as_string(0), "123.45");
3482 assert_eq!(dec.value_as_string(1), "-1.23");
3483 }
3484 }
3485
3486 #[test]
3487 fn test_decimal_decoding_bytes_with_nulls() {
3488 let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3489 let inner = Decoder::try_new(&dt).unwrap();
3490 let mut decoder = Decoder::Nullable(
3491 Nullability::NullSecond,
3492 NullBufferBuilder::new(DEFAULT_CAPACITY),
3493 Box::new(inner),
3494 NullablePlan::ReadTag,
3495 );
3496 let mut data = Vec::new();
3497 data.extend_from_slice(&encode_avro_int(0));
3498 data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3499 data.extend_from_slice(&encode_avro_int(1));
3500 data.extend_from_slice(&encode_avro_int(0));
3501 data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3502 let mut cursor = AvroCursor::new(&data);
3503 decoder.decode(&mut cursor).unwrap();
3504 decoder.decode(&mut cursor).unwrap();
3505 decoder.decode(&mut cursor).unwrap();
3506 let arr = decoder.flush(None).unwrap();
3507 #[cfg(feature = "small_decimals")]
3508 {
3509 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3510 assert_eq!(dec_arr.len(), 3);
3511 assert!(dec_arr.is_valid(0));
3512 assert!(!dec_arr.is_valid(1));
3513 assert!(dec_arr.is_valid(2));
3514 assert_eq!(dec_arr.value_as_string(0), "123.4");
3515 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3516 }
3517 #[cfg(not(feature = "small_decimals"))]
3518 {
3519 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3520 assert_eq!(dec_arr.len(), 3);
3521 assert!(dec_arr.is_valid(0));
3522 assert!(!dec_arr.is_valid(1));
3523 assert!(dec_arr.is_valid(2));
3524 assert_eq!(dec_arr.value_as_string(0), "123.4");
3525 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3526 }
3527 }
3528
3529 #[test]
3530 fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3531 let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3532 let inner = Decoder::try_new(&dt).unwrap();
3533 let mut decoder = Decoder::Nullable(
3534 Nullability::NullSecond,
3535 NullBufferBuilder::new(DEFAULT_CAPACITY),
3536 Box::new(inner),
3537 NullablePlan::ReadTag,
3538 );
3539 let row1 = [
3540 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3541 0xE2, 0x40,
3542 ];
3543 let row3 = [
3544 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3545 0x1D, 0xC0,
3546 ];
3547 let mut data = Vec::new();
3548 data.extend_from_slice(&encode_avro_int(0));
3549 data.extend_from_slice(&row1);
3550 data.extend_from_slice(&encode_avro_int(1));
3551 data.extend_from_slice(&encode_avro_int(0));
3552 data.extend_from_slice(&row3);
3553 let mut cursor = AvroCursor::new(&data);
3554 decoder.decode(&mut cursor).unwrap();
3555 decoder.decode(&mut cursor).unwrap();
3556 decoder.decode(&mut cursor).unwrap();
3557 let arr = decoder.flush(None).unwrap();
3558 #[cfg(feature = "small_decimals")]
3559 {
3560 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3561 assert_eq!(dec_arr.len(), 3);
3562 assert!(dec_arr.is_valid(0));
3563 assert!(!dec_arr.is_valid(1));
3564 assert!(dec_arr.is_valid(2));
3565 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3566 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3567 }
3568 #[cfg(not(feature = "small_decimals"))]
3569 {
3570 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3571 assert_eq!(dec_arr.len(), 3);
3572 assert!(dec_arr.is_valid(0));
3573 assert!(!dec_arr.is_valid(1));
3574 assert!(dec_arr.is_valid(2));
3575 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3576 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3577 }
3578 }
3579
3580 #[test]
3581 fn test_enum_decoding() {
3582 let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3583 let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3584 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3585 let mut data = Vec::new();
3586 data.extend_from_slice(&encode_avro_int(2));
3587 data.extend_from_slice(&encode_avro_int(0));
3588 data.extend_from_slice(&encode_avro_int(1));
3589 let mut cursor = AvroCursor::new(&data);
3590 decoder.decode(&mut cursor).unwrap();
3591 decoder.decode(&mut cursor).unwrap();
3592 decoder.decode(&mut cursor).unwrap();
3593 let array = decoder.flush(None).unwrap();
3594 let dict_array = array
3595 .as_any()
3596 .downcast_ref::<DictionaryArray<Int32Type>>()
3597 .unwrap();
3598 assert_eq!(dict_array.len(), 3);
3599 let values = dict_array
3600 .values()
3601 .as_any()
3602 .downcast_ref::<StringArray>()
3603 .unwrap();
3604 assert_eq!(values.value(0), "A");
3605 assert_eq!(values.value(1), "B");
3606 assert_eq!(values.value(2), "C");
3607 assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3608 }
3609
3610 #[test]
3611 fn test_enum_decoding_with_nulls() {
3612 let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3613 let enum_codec = Codec::Enum(symbols.clone());
3614 let avro_type =
3615 AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3616 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3617 let mut data = Vec::new();
3618 data.extend_from_slice(&encode_avro_long(1));
3619 data.extend_from_slice(&encode_avro_int(1));
3620 data.extend_from_slice(&encode_avro_long(0));
3621 data.extend_from_slice(&encode_avro_long(1));
3622 data.extend_from_slice(&encode_avro_int(0));
3623 let mut cursor = AvroCursor::new(&data);
3624 decoder.decode(&mut cursor).unwrap();
3625 decoder.decode(&mut cursor).unwrap();
3626 decoder.decode(&mut cursor).unwrap();
3627 let array = decoder.flush(None).unwrap();
3628 let dict_array = array
3629 .as_any()
3630 .downcast_ref::<DictionaryArray<Int32Type>>()
3631 .unwrap();
3632 assert_eq!(dict_array.len(), 3);
3633 assert!(dict_array.is_valid(0));
3634 assert!(dict_array.is_null(1));
3635 assert!(dict_array.is_valid(2));
3636 let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3637 assert_eq!(dict_array.keys(), &expected_keys);
3638 let values = dict_array
3639 .values()
3640 .as_any()
3641 .downcast_ref::<StringArray>()
3642 .unwrap();
3643 assert_eq!(values.value(0), "X");
3644 assert_eq!(values.value(1), "Y");
3645 }
3646
3647 #[test]
3648 fn test_duration_decoding_with_nulls() {
3649 let duration_codec = Codec::Interval;
3650 let avro_type = AvroDataType::new(
3651 duration_codec,
3652 Default::default(),
3653 Some(Nullability::NullFirst),
3654 );
3655 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3656 let mut data = Vec::new();
3657 data.extend_from_slice(&encode_avro_long(1)); let mut duration1 = Vec::new();
3660 duration1.extend_from_slice(&1u32.to_le_bytes());
3661 duration1.extend_from_slice(&2u32.to_le_bytes());
3662 duration1.extend_from_slice(&3u32.to_le_bytes());
3663 data.extend_from_slice(&duration1);
3664 data.extend_from_slice(&encode_avro_long(0)); data.extend_from_slice(&encode_avro_long(1)); let mut duration2 = Vec::new();
3668 duration2.extend_from_slice(&4u32.to_le_bytes());
3669 duration2.extend_from_slice(&5u32.to_le_bytes());
3670 duration2.extend_from_slice(&6u32.to_le_bytes());
3671 data.extend_from_slice(&duration2);
3672 let mut cursor = AvroCursor::new(&data);
3673 decoder.decode(&mut cursor).unwrap();
3674 decoder.decode(&mut cursor).unwrap();
3675 decoder.decode(&mut cursor).unwrap();
3676 let array = decoder.flush(None).unwrap();
3677 let interval_array = array
3678 .as_any()
3679 .downcast_ref::<IntervalMonthDayNanoArray>()
3680 .unwrap();
3681 assert_eq!(interval_array.len(), 3);
3682 assert!(interval_array.is_valid(0));
3683 assert!(interval_array.is_null(1));
3684 assert!(interval_array.is_valid(2));
3685 let expected = IntervalMonthDayNanoArray::from(vec![
3686 Some(IntervalMonthDayNano {
3687 months: 1,
3688 days: 2,
3689 nanoseconds: 3_000_000,
3690 }),
3691 None,
3692 Some(IntervalMonthDayNano {
3693 months: 4,
3694 days: 5,
3695 nanoseconds: 6_000_000,
3696 }),
3697 ]);
3698 assert_eq!(interval_array, &expected);
3699 }
3700
3701 #[cfg(feature = "avro_custom_types")]
3702 #[test]
3703 fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3704 let avro_type = AvroDataType::new(
3705 Codec::IntervalMonthDayNano,
3706 Default::default(),
3707 Some(Nullability::NullFirst),
3708 );
3709 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3710 let mut data = Vec::new();
3711 data.extend_from_slice(&encode_avro_long(1));
3713 data.extend_from_slice(&1i32.to_le_bytes());
3714 data.extend_from_slice(&(-2i32).to_le_bytes());
3715 data.extend_from_slice(&3i64.to_le_bytes());
3716 data.extend_from_slice(&encode_avro_long(0));
3718 data.extend_from_slice(&encode_avro_long(1));
3720 data.extend_from_slice(&(-4i32).to_le_bytes());
3721 data.extend_from_slice(&5i32.to_le_bytes());
3722 data.extend_from_slice(&(-6i64).to_le_bytes());
3723 let mut cursor = AvroCursor::new(&data);
3724 decoder.decode(&mut cursor).unwrap();
3725 decoder.decode(&mut cursor).unwrap();
3726 decoder.decode(&mut cursor).unwrap();
3727 let array = decoder.flush(None).unwrap();
3728 let interval_array = array
3729 .as_any()
3730 .downcast_ref::<IntervalMonthDayNanoArray>()
3731 .unwrap();
3732 assert_eq!(interval_array.len(), 3);
3733 let expected = IntervalMonthDayNanoArray::from(vec![
3734 Some(IntervalMonthDayNano::new(1, -2, 3)),
3735 None,
3736 Some(IntervalMonthDayNano::new(-4, 5, -6)),
3737 ]);
3738 assert_eq!(interval_array, &expected);
3739 }
3740
3741 #[test]
3742 fn test_duration_decoding_empty() {
3743 let duration_codec = Codec::Interval;
3744 let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3745 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3746 let array = decoder.flush(None).unwrap();
3747 assert_eq!(array.len(), 0);
3748 }
3749
3750 #[test]
3751 #[cfg(feature = "avro_custom_types")]
3752 fn test_duration_seconds_decoding() {
3753 let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3754 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3755 let mut data = Vec::new();
3756 data.extend_from_slice(&encode_avro_long(0));
3758 data.extend_from_slice(&encode_avro_long(-1));
3759 data.extend_from_slice(&encode_avro_long(2));
3760 let mut cursor = AvroCursor::new(&data);
3761 decoder.decode(&mut cursor).unwrap();
3762 decoder.decode(&mut cursor).unwrap();
3763 decoder.decode(&mut cursor).unwrap();
3764 let array = decoder.flush(None).unwrap();
3765 let dur = array
3766 .as_any()
3767 .downcast_ref::<DurationSecondArray>()
3768 .unwrap();
3769 assert_eq!(dur.values(), &[0, -1, 2]);
3770 }
3771
3772 #[test]
3773 #[cfg(feature = "avro_custom_types")]
3774 fn test_duration_milliseconds_decoding() {
3775 let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3776 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3777 let mut data = Vec::new();
3778 for v in [1i64, 0, -2] {
3779 data.extend_from_slice(&encode_avro_long(v));
3780 }
3781 let mut cursor = AvroCursor::new(&data);
3782 for _ in 0..3 {
3783 decoder.decode(&mut cursor).unwrap();
3784 }
3785 let array = decoder.flush(None).unwrap();
3786 let dur = array
3787 .as_any()
3788 .downcast_ref::<DurationMillisecondArray>()
3789 .unwrap();
3790 assert_eq!(dur.values(), &[1, 0, -2]);
3791 }
3792
3793 #[test]
3794 #[cfg(feature = "avro_custom_types")]
3795 fn test_duration_microseconds_decoding() {
3796 let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3797 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3798 let mut data = Vec::new();
3799 for v in [5i64, -6, 7] {
3800 data.extend_from_slice(&encode_avro_long(v));
3801 }
3802 let mut cursor = AvroCursor::new(&data);
3803 for _ in 0..3 {
3804 decoder.decode(&mut cursor).unwrap();
3805 }
3806 let array = decoder.flush(None).unwrap();
3807 let dur = array
3808 .as_any()
3809 .downcast_ref::<DurationMicrosecondArray>()
3810 .unwrap();
3811 assert_eq!(dur.values(), &[5, -6, 7]);
3812 }
3813
3814 #[test]
3815 #[cfg(feature = "avro_custom_types")]
3816 fn test_duration_nanoseconds_decoding() {
3817 let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3818 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3819 let mut data = Vec::new();
3820 for v in [8i64, 9, -10] {
3821 data.extend_from_slice(&encode_avro_long(v));
3822 }
3823 let mut cursor = AvroCursor::new(&data);
3824 for _ in 0..3 {
3825 decoder.decode(&mut cursor).unwrap();
3826 }
3827 let array = decoder.flush(None).unwrap();
3828 let dur = array
3829 .as_any()
3830 .downcast_ref::<DurationNanosecondArray>()
3831 .unwrap();
3832 assert_eq!(dur.values(), &[8, 9, -10]);
3833 }
3834
3835 #[test]
3836 fn test_nullable_decode_error_bitmap_corruption() {
3837 let avro_type = AvroDataType::new(
3839 Codec::Int32,
3840 Default::default(),
3841 Some(Nullability::NullSecond),
3842 );
3843 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3844
3845 let mut row1 = Vec::new();
3847 row1.extend_from_slice(&encode_avro_int(1));
3848
3849 let mut row2 = Vec::new();
3851 row2.extend_from_slice(&encode_avro_int(0)); let mut row3 = Vec::new();
3855 row3.extend_from_slice(&encode_avro_int(0)); row3.extend_from_slice(&encode_avro_int(42)); decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3859 assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3861
3862 let array = decoder.flush(None).unwrap();
3863
3864 assert_eq!(array.len(), 2);
3866 let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3867 assert!(int_array.is_null(0)); assert_eq!(int_array.value(1), 42); }
3870
3871 #[test]
3872 fn test_enum_mapping_reordered_symbols() {
3873 let reader_symbols: Arc<[String]> =
3874 vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3875 let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3876 let default_index: i32 = -1;
3877 let mut dec = Decoder::Enum(
3878 Vec::with_capacity(DEFAULT_CAPACITY),
3879 reader_symbols.clone(),
3880 Some(EnumResolution {
3881 mapping,
3882 default_index,
3883 }),
3884 );
3885 let mut data = Vec::new();
3886 data.extend_from_slice(&encode_avro_int(0));
3887 data.extend_from_slice(&encode_avro_int(1));
3888 data.extend_from_slice(&encode_avro_int(2));
3889 let mut cur = AvroCursor::new(&data);
3890 dec.decode(&mut cur).unwrap();
3891 dec.decode(&mut cur).unwrap();
3892 dec.decode(&mut cur).unwrap();
3893 let arr = dec.flush(None).unwrap();
3894 let dict = arr
3895 .as_any()
3896 .downcast_ref::<DictionaryArray<Int32Type>>()
3897 .unwrap();
3898 let expected_keys = Int32Array::from(vec![2, 0, 1]);
3899 assert_eq!(dict.keys(), &expected_keys);
3900 let values = dict
3901 .values()
3902 .as_any()
3903 .downcast_ref::<StringArray>()
3904 .unwrap();
3905 assert_eq!(values.value(0), "B");
3906 assert_eq!(values.value(1), "C");
3907 assert_eq!(values.value(2), "A");
3908 }
3909
3910 #[test]
3911 fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3912 let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3913 let default_index: i32 = 1;
3914 let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3915 let mut dec = Decoder::Enum(
3916 Vec::with_capacity(DEFAULT_CAPACITY),
3917 reader_symbols.clone(),
3918 Some(EnumResolution {
3919 mapping,
3920 default_index,
3921 }),
3922 );
3923 let mut data = Vec::new();
3924 data.extend_from_slice(&encode_avro_int(0));
3925 data.extend_from_slice(&encode_avro_int(1));
3926 data.extend_from_slice(&encode_avro_int(99));
3927 let mut cur = AvroCursor::new(&data);
3928 dec.decode(&mut cur).unwrap();
3929 dec.decode(&mut cur).unwrap();
3930 dec.decode(&mut cur).unwrap();
3931 let arr = dec.flush(None).unwrap();
3932 let dict = arr
3933 .as_any()
3934 .downcast_ref::<DictionaryArray<Int32Type>>()
3935 .unwrap();
3936 let expected_keys = Int32Array::from(vec![0, 1, 1]);
3937 assert_eq!(dict.keys(), &expected_keys);
3938 let values = dict
3939 .values()
3940 .as_any()
3941 .downcast_ref::<StringArray>()
3942 .unwrap();
3943 assert_eq!(values.value(0), "A");
3944 assert_eq!(values.value(1), "B");
3945 }
3946
3947 #[test]
3948 fn test_enum_mapping_unknown_symbol_without_default_errors() {
3949 let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3950 let default_index: i32 = -1; let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3952 let mut dec = Decoder::Enum(
3953 Vec::with_capacity(DEFAULT_CAPACITY),
3954 reader_symbols,
3955 Some(EnumResolution {
3956 mapping,
3957 default_index,
3958 }),
3959 );
3960 let data = encode_avro_int(0);
3961 let mut cur = AvroCursor::new(&data);
3962 let err = dec
3963 .decode(&mut cur)
3964 .expect_err("expected decode error for unresolved enum without default");
3965 let msg = err.to_string();
3966 assert!(
3967 msg.contains("not resolvable") && msg.contains("no default"),
3968 "unexpected error message: {msg}"
3969 );
3970 }
3971
3972 fn make_record_resolved_decoder(
3973 reader_fields: &[(&str, DataType, bool)],
3974 writer_to_reader: Vec<Option<usize>>,
3975 skip_decoders: Vec<Option<Skipper>>,
3976 ) -> Decoder {
3977 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3978 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3979 for (name, dt, nullable) in reader_fields {
3980 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3981 let enc = match dt {
3982 DataType::Int32 => Decoder::Int32(Vec::new()),
3983 DataType::Int64 => Decoder::Int64(Vec::new()),
3984 DataType::Utf8 => {
3985 Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3986 }
3987 other => panic!("Unsupported test reader field type: {other:?}"),
3988 };
3989 encodings.push(enc);
3990 }
3991 let fields: Fields = field_refs.into();
3992 Decoder::Record(
3993 fields,
3994 encodings,
3995 Some(Projector {
3996 writer_to_reader: Arc::from(writer_to_reader),
3997 skip_decoders,
3998 field_defaults: vec![None; reader_fields.len()],
3999 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4000 }),
4001 )
4002 }
4003
4004 #[test]
4005 fn test_skip_writer_trailing_field_int32() {
4006 let mut dec = make_record_resolved_decoder(
4007 &[("id", arrow_schema::DataType::Int32, false)],
4008 vec![Some(0), None],
4009 vec![None, Some(super::Skipper::Int32)],
4010 );
4011 let mut data = Vec::new();
4012 data.extend_from_slice(&encode_avro_int(7));
4013 data.extend_from_slice(&encode_avro_int(999));
4014 let mut cur = AvroCursor::new(&data);
4015 dec.decode(&mut cur).unwrap();
4016 assert_eq!(cur.position(), data.len());
4017 let arr = dec.flush(None).unwrap();
4018 let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
4019 assert_eq!(struct_arr.len(), 1);
4020 let id = struct_arr
4021 .column_by_name("id")
4022 .unwrap()
4023 .as_any()
4024 .downcast_ref::<Int32Array>()
4025 .unwrap();
4026 assert_eq!(id.value(0), 7);
4027 }
4028
4029 #[test]
4030 fn test_skip_writer_middle_field_string() {
4031 let mut dec = make_record_resolved_decoder(
4032 &[
4033 ("id", DataType::Int32, false),
4034 ("score", DataType::Int64, false),
4035 ],
4036 vec![Some(0), None, Some(1)],
4037 vec![None, Some(Skipper::String), None],
4038 );
4039 let mut data = Vec::new();
4040 data.extend_from_slice(&encode_avro_int(42));
4041 data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
4042 data.extend_from_slice(&encode_avro_long(1000));
4043 let mut cur = AvroCursor::new(&data);
4044 dec.decode(&mut cur).unwrap();
4045 assert_eq!(cur.position(), data.len());
4046 let arr = dec.flush(None).unwrap();
4047 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4048 let id = s
4049 .column_by_name("id")
4050 .unwrap()
4051 .as_any()
4052 .downcast_ref::<Int32Array>()
4053 .unwrap();
4054 let score = s
4055 .column_by_name("score")
4056 .unwrap()
4057 .as_any()
4058 .downcast_ref::<Int64Array>()
4059 .unwrap();
4060 assert_eq!(id.value(0), 42);
4061 assert_eq!(score.value(0), 1000);
4062 }
4063
4064 #[test]
4065 fn test_skip_writer_array_with_negative_block_count_fast() {
4066 let mut dec = make_record_resolved_decoder(
4067 &[("id", DataType::Int32, false)],
4068 vec![None, Some(0)],
4069 vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
4070 );
4071 let mut array_payload = Vec::new();
4072 array_payload.extend_from_slice(&encode_avro_int(1));
4073 array_payload.extend_from_slice(&encode_avro_int(2));
4074 array_payload.extend_from_slice(&encode_avro_int(3));
4075 let mut data = Vec::new();
4076 data.extend_from_slice(&encode_avro_long(-3));
4077 data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
4078 data.extend_from_slice(&array_payload);
4079 data.extend_from_slice(&encode_avro_long(0));
4080 data.extend_from_slice(&encode_avro_int(5));
4081 let mut cur = AvroCursor::new(&data);
4082 dec.decode(&mut cur).unwrap();
4083 assert_eq!(cur.position(), data.len());
4084 let arr = dec.flush(None).unwrap();
4085 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4086 let id = s
4087 .column_by_name("id")
4088 .unwrap()
4089 .as_any()
4090 .downcast_ref::<Int32Array>()
4091 .unwrap();
4092 assert_eq!(id.len(), 1);
4093 assert_eq!(id.value(0), 5);
4094 }
4095
4096 #[test]
4097 fn test_skip_writer_map_with_negative_block_count_fast() {
4098 let mut dec = make_record_resolved_decoder(
4099 &[("id", DataType::Int32, false)],
4100 vec![None, Some(0)],
4101 vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
4102 );
4103 let mut entries = Vec::new();
4104 entries.extend_from_slice(&encode_avro_bytes(b"k1"));
4105 entries.extend_from_slice(&encode_avro_int(10));
4106 entries.extend_from_slice(&encode_avro_bytes(b"k2"));
4107 entries.extend_from_slice(&encode_avro_int(20));
4108 let mut data = Vec::new();
4109 data.extend_from_slice(&encode_avro_long(-2));
4110 data.extend_from_slice(&encode_avro_long(entries.len() as i64));
4111 data.extend_from_slice(&entries);
4112 data.extend_from_slice(&encode_avro_long(0));
4113 data.extend_from_slice(&encode_avro_int(123));
4114 let mut cur = AvroCursor::new(&data);
4115 dec.decode(&mut cur).unwrap();
4116 assert_eq!(cur.position(), data.len());
4117 let arr = dec.flush(None).unwrap();
4118 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4119 let id = s
4120 .column_by_name("id")
4121 .unwrap()
4122 .as_any()
4123 .downcast_ref::<Int32Array>()
4124 .unwrap();
4125 assert_eq!(id.len(), 1);
4126 assert_eq!(id.value(0), 123);
4127 }
4128
4129 #[test]
4130 fn test_skip_writer_nullable_field_union_nullfirst() {
4131 let mut dec = make_record_resolved_decoder(
4132 &[("id", DataType::Int32, false)],
4133 vec![None, Some(0)],
4134 vec![
4135 Some(super::Skipper::Nullable(
4136 Nullability::NullFirst,
4137 Box::new(super::Skipper::Int32),
4138 )),
4139 None,
4140 ],
4141 );
4142 let mut row1 = Vec::new();
4143 row1.extend_from_slice(&encode_avro_long(0));
4144 row1.extend_from_slice(&encode_avro_int(5));
4145 let mut row2 = Vec::new();
4146 row2.extend_from_slice(&encode_avro_long(1));
4147 row2.extend_from_slice(&encode_avro_int(123));
4148 row2.extend_from_slice(&encode_avro_int(7));
4149 let mut cur1 = AvroCursor::new(&row1);
4150 let mut cur2 = AvroCursor::new(&row2);
4151 dec.decode(&mut cur1).unwrap();
4152 dec.decode(&mut cur2).unwrap();
4153 assert_eq!(cur1.position(), row1.len());
4154 assert_eq!(cur2.position(), row2.len());
4155 let arr = dec.flush(None).unwrap();
4156 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4157 let id = s
4158 .column_by_name("id")
4159 .unwrap()
4160 .as_any()
4161 .downcast_ref::<Int32Array>()
4162 .unwrap();
4163 assert_eq!(id.len(), 2);
4164 assert_eq!(id.value(0), 5);
4165 assert_eq!(id.value(1), 7);
4166 }
4167
4168 fn make_dense_union_avro(
4169 children: Vec<(Codec, &'_ str, DataType)>,
4170 type_ids: Vec<i8>,
4171 ) -> AvroDataType {
4172 let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
4173 let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
4174 for (codec, name, dt) in children.into_iter() {
4175 avro_children.push(AvroDataType::new(codec, Default::default(), None));
4176 fields.push(arrow_schema::Field::new(name, dt, true));
4177 }
4178 let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
4179 let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
4180 AvroDataType::new(union_codec, Default::default(), None)
4181 }
4182
4183 #[test]
4184 fn test_union_dense_two_children_custom_type_ids() {
4185 let union_dt = make_dense_union_avro(
4186 vec![
4187 (Codec::Int32, "i", DataType::Int32),
4188 (Codec::Utf8, "s", DataType::Utf8),
4189 ],
4190 vec![2, 5],
4191 );
4192 let mut dec = Decoder::try_new(&union_dt).unwrap();
4193 let mut r1 = Vec::new();
4194 r1.extend_from_slice(&encode_avro_long(0));
4195 r1.extend_from_slice(&encode_avro_int(7));
4196 let mut r2 = Vec::new();
4197 r2.extend_from_slice(&encode_avro_long(1));
4198 r2.extend_from_slice(&encode_avro_bytes(b"x"));
4199 let mut r3 = Vec::new();
4200 r3.extend_from_slice(&encode_avro_long(0));
4201 r3.extend_from_slice(&encode_avro_int(-1));
4202 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4203 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4204 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4205 let array = dec.flush(None).unwrap();
4206 let ua = array
4207 .as_any()
4208 .downcast_ref::<UnionArray>()
4209 .expect("expected UnionArray");
4210 assert_eq!(ua.len(), 3);
4211 assert_eq!(ua.type_id(0), 2);
4212 assert_eq!(ua.type_id(1), 5);
4213 assert_eq!(ua.type_id(2), 2);
4214 assert_eq!(ua.value_offset(0), 0);
4215 assert_eq!(ua.value_offset(1), 0);
4216 assert_eq!(ua.value_offset(2), 1);
4217 let int_child = ua
4218 .child(2)
4219 .as_any()
4220 .downcast_ref::<Int32Array>()
4221 .expect("int child");
4222 assert_eq!(int_child.len(), 2);
4223 assert_eq!(int_child.value(0), 7);
4224 assert_eq!(int_child.value(1), -1);
4225 let str_child = ua
4226 .child(5)
4227 .as_any()
4228 .downcast_ref::<StringArray>()
4229 .expect("string child");
4230 assert_eq!(str_child.len(), 1);
4231 assert_eq!(str_child.value(0), "x");
4232 }
4233
4234 #[test]
4235 fn test_union_dense_with_null_and_string_children() {
4236 let union_dt = make_dense_union_avro(
4237 vec![
4238 (Codec::Null, "n", DataType::Null),
4239 (Codec::Utf8, "s", DataType::Utf8),
4240 ],
4241 vec![42, 7],
4242 );
4243 let mut dec = Decoder::try_new(&union_dt).unwrap();
4244 let r1 = encode_avro_long(0);
4245 let mut r2 = Vec::new();
4246 r2.extend_from_slice(&encode_avro_long(1));
4247 r2.extend_from_slice(&encode_avro_bytes(b"abc"));
4248 let r3 = encode_avro_long(0);
4249 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
4250 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
4251 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
4252 let array = dec.flush(None).unwrap();
4253 let ua = array
4254 .as_any()
4255 .downcast_ref::<UnionArray>()
4256 .expect("expected UnionArray");
4257 assert_eq!(ua.len(), 3);
4258 assert_eq!(ua.type_id(0), 42);
4259 assert_eq!(ua.type_id(1), 7);
4260 assert_eq!(ua.type_id(2), 42);
4261 assert_eq!(ua.value_offset(0), 0);
4262 assert_eq!(ua.value_offset(1), 0);
4263 assert_eq!(ua.value_offset(2), 1);
4264 let null_child = ua
4265 .child(42)
4266 .as_any()
4267 .downcast_ref::<NullArray>()
4268 .expect("null child");
4269 assert_eq!(null_child.len(), 2);
4270 let str_child = ua
4271 .child(7)
4272 .as_any()
4273 .downcast_ref::<StringArray>()
4274 .expect("string child");
4275 assert_eq!(str_child.len(), 1);
4276 assert_eq!(str_child.value(0), "abc");
4277 }
4278
4279 #[test]
4280 fn test_union_decode_negative_branch_index_errors() {
4281 let union_dt = make_dense_union_avro(
4282 vec![
4283 (Codec::Int32, "i", DataType::Int32),
4284 (Codec::Utf8, "s", DataType::Utf8),
4285 ],
4286 vec![0, 1],
4287 );
4288 let mut dec = Decoder::try_new(&union_dt).unwrap();
4289 let row = encode_avro_long(-1); let err = dec
4291 .decode(&mut AvroCursor::new(&row))
4292 .expect_err("expected error for negative branch index");
4293 let msg = err.to_string();
4294 assert!(
4295 msg.contains("Negative union branch index"),
4296 "unexpected error message: {msg}"
4297 );
4298 }
4299
4300 #[test]
4301 fn test_union_decode_out_of_range_branch_index_errors() {
4302 let union_dt = make_dense_union_avro(
4303 vec![
4304 (Codec::Int32, "i", DataType::Int32),
4305 (Codec::Utf8, "s", DataType::Utf8),
4306 ],
4307 vec![10, 11],
4308 );
4309 let mut dec = Decoder::try_new(&union_dt).unwrap();
4310 let row = encode_avro_long(2);
4311 let err = dec
4312 .decode(&mut AvroCursor::new(&row))
4313 .expect_err("expected error for out-of-range branch index");
4314 let msg = err.to_string();
4315 assert!(
4316 msg.contains("out of range"),
4317 "unexpected error message: {msg}"
4318 );
4319 }
4320
4321 #[test]
4322 fn test_union_sparse_mode_not_supported() {
4323 let children: Vec<AvroDataType> = vec![
4324 AvroDataType::new(Codec::Int32, Default::default(), None),
4325 AvroDataType::new(Codec::Utf8, Default::default(), None),
4326 ];
4327 let uf = UnionFields::try_new(
4328 vec![1, 3],
4329 vec![
4330 arrow_schema::Field::new("i", DataType::Int32, true),
4331 arrow_schema::Field::new("s", DataType::Utf8, true),
4332 ],
4333 )
4334 .unwrap();
4335 let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
4336 let dt = AvroDataType::new(codec, Default::default(), None);
4337 let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
4338 let msg = err.to_string();
4339 assert!(
4340 msg.contains("Sparse Arrow unions are not yet supported"),
4341 "unexpected error message: {msg}"
4342 );
4343 }
4344
4345 fn make_record_decoder_with_projector_defaults(
4346 reader_fields: &[(&str, DataType, bool)],
4347 field_defaults: Vec<Option<AvroLiteral>>,
4348 default_injections: Vec<(usize, AvroLiteral)>,
4349 writer_to_reader_len: usize,
4350 ) -> Decoder {
4351 assert_eq!(
4352 field_defaults.len(),
4353 reader_fields.len(),
4354 "field_defaults must have one entry per reader field"
4355 );
4356 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
4357 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
4358 for (name, dt, nullable) in reader_fields {
4359 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4360 let enc = match dt {
4361 DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
4362 DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
4363 DataType::Utf8 => Decoder::String(
4364 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4365 Vec::with_capacity(DEFAULT_CAPACITY),
4366 ),
4367 other => panic!("Unsupported test field type in helper: {other:?}"),
4368 };
4369 encodings.push(enc);
4370 }
4371 let fields: Fields = field_refs.into();
4372 let skip_decoders: Vec<Option<Skipper>> =
4373 (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
4374 let projector = Projector {
4375 writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
4376 skip_decoders,
4377 field_defaults,
4378 default_injections: Arc::from(default_injections),
4379 };
4380 Decoder::Record(fields, encodings, Some(projector))
4381 }
4382
4383 #[cfg(feature = "avro_custom_types")]
4384 #[test]
4385 fn test_default_append_custom_integer_range_validation() {
4386 let mut d_i8 = Decoder::Int8(Vec::with_capacity(DEFAULT_CAPACITY));
4387 d_i8.append_default(&AvroLiteral::Int(i8::MIN as i32))
4388 .unwrap();
4389 d_i8.append_default(&AvroLiteral::Int(i8::MAX as i32))
4390 .unwrap();
4391 let err_i8_high = d_i8
4392 .append_default(&AvroLiteral::Int(i8::MAX as i32 + 1))
4393 .unwrap_err();
4394 assert!(err_i8_high.to_string().contains("out of range for i8"));
4395 let err_i8_low = d_i8
4396 .append_default(&AvroLiteral::Int(i8::MIN as i32 - 1))
4397 .unwrap_err();
4398 assert!(err_i8_low.to_string().contains("out of range for i8"));
4399 let arr_i8 = d_i8.flush(None).unwrap();
4400 let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4401 assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4402
4403 let mut d_i16 = Decoder::Int16(Vec::with_capacity(DEFAULT_CAPACITY));
4404 d_i16
4405 .append_default(&AvroLiteral::Int(i16::MIN as i32))
4406 .unwrap();
4407 d_i16
4408 .append_default(&AvroLiteral::Int(i16::MAX as i32))
4409 .unwrap();
4410 let err_i16_high = d_i16
4411 .append_default(&AvroLiteral::Int(i16::MAX as i32 + 1))
4412 .unwrap_err();
4413 assert!(err_i16_high.to_string().contains("out of range for i16"));
4414 let err_i16_low = d_i16
4415 .append_default(&AvroLiteral::Int(i16::MIN as i32 - 1))
4416 .unwrap_err();
4417 assert!(err_i16_low.to_string().contains("out of range for i16"));
4418 let arr_i16 = d_i16.flush(None).unwrap();
4419 let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4420 assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4421
4422 let mut d_u8 = Decoder::UInt8(Vec::with_capacity(DEFAULT_CAPACITY));
4423 d_u8.append_default(&AvroLiteral::Int(0)).unwrap();
4424 d_u8.append_default(&AvroLiteral::Int(u8::MAX as i32))
4425 .unwrap();
4426 let err_u8_neg = d_u8.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4427 assert!(err_u8_neg.to_string().contains("out of range for u8"));
4428 let err_u8_high = d_u8
4429 .append_default(&AvroLiteral::Int(u8::MAX as i32 + 1))
4430 .unwrap_err();
4431 assert!(err_u8_high.to_string().contains("out of range for u8"));
4432 let arr_u8 = d_u8.flush(None).unwrap();
4433 let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4434 assert_eq!(values_u8.values(), &[0, u8::MAX]);
4435
4436 let mut d_u16 = Decoder::UInt16(Vec::with_capacity(DEFAULT_CAPACITY));
4437 d_u16.append_default(&AvroLiteral::Int(0)).unwrap();
4438 d_u16
4439 .append_default(&AvroLiteral::Int(u16::MAX as i32))
4440 .unwrap();
4441 let err_u16_neg = d_u16.append_default(&AvroLiteral::Int(-1)).unwrap_err();
4442 assert!(err_u16_neg.to_string().contains("out of range for u16"));
4443 let err_u16_high = d_u16
4444 .append_default(&AvroLiteral::Int(u16::MAX as i32 + 1))
4445 .unwrap_err();
4446 assert!(err_u16_high.to_string().contains("out of range for u16"));
4447 let arr_u16 = d_u16.flush(None).unwrap();
4448 let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4449 assert_eq!(values_u16.values(), &[0, u16::MAX]);
4450
4451 let mut d_u32 = Decoder::UInt32(Vec::with_capacity(DEFAULT_CAPACITY));
4452 d_u32.append_default(&AvroLiteral::Long(0)).unwrap();
4453 d_u32
4454 .append_default(&AvroLiteral::Long(u32::MAX as i64))
4455 .unwrap();
4456 let err_u32_neg = d_u32.append_default(&AvroLiteral::Long(-1)).unwrap_err();
4457 assert!(err_u32_neg.to_string().contains("out of range for u32"));
4458 let err_u32_high = d_u32
4459 .append_default(&AvroLiteral::Long(u32::MAX as i64 + 1))
4460 .unwrap_err();
4461 assert!(err_u32_high.to_string().contains("out of range for u32"));
4462 let arr_u32 = d_u32.flush(None).unwrap();
4463 let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4464 assert_eq!(values_u32.values(), &[0, u32::MAX]);
4465 }
4466
4467 #[cfg(feature = "avro_custom_types")]
4468 #[test]
4469 fn test_decode_custom_integer_range_validation() {
4470 let mut d_i8 = Decoder::try_new(&avro_from_codec(Codec::Int8)).unwrap();
4471 d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32)))
4472 .unwrap();
4473 d_i8.decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32)))
4474 .unwrap();
4475 let err_i8_high = d_i8
4476 .decode(&mut AvroCursor::new(&encode_avro_int(i8::MAX as i32 + 1)))
4477 .unwrap_err();
4478 assert!(err_i8_high.to_string().contains("out of range for i8"));
4479 let err_i8_low = d_i8
4480 .decode(&mut AvroCursor::new(&encode_avro_int(i8::MIN as i32 - 1)))
4481 .unwrap_err();
4482 assert!(err_i8_low.to_string().contains("out of range for i8"));
4483 let arr_i8 = d_i8.flush(None).unwrap();
4484 let values_i8 = arr_i8.as_any().downcast_ref::<Int8Array>().unwrap();
4485 assert_eq!(values_i8.values(), &[i8::MIN, i8::MAX]);
4486
4487 let mut d_i16 = Decoder::try_new(&avro_from_codec(Codec::Int16)).unwrap();
4488 d_i16
4489 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32)))
4490 .unwrap();
4491 d_i16
4492 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32)))
4493 .unwrap();
4494 let err_i16_high = d_i16
4495 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MAX as i32 + 1)))
4496 .unwrap_err();
4497 assert!(err_i16_high.to_string().contains("out of range for i16"));
4498 let err_i16_low = d_i16
4499 .decode(&mut AvroCursor::new(&encode_avro_int(i16::MIN as i32 - 1)))
4500 .unwrap_err();
4501 assert!(err_i16_low.to_string().contains("out of range for i16"));
4502 let arr_i16 = d_i16.flush(None).unwrap();
4503 let values_i16 = arr_i16.as_any().downcast_ref::<Int16Array>().unwrap();
4504 assert_eq!(values_i16.values(), &[i16::MIN, i16::MAX]);
4505
4506 let mut d_u8 = Decoder::try_new(&avro_from_codec(Codec::UInt8)).unwrap();
4507 d_u8.decode(&mut AvroCursor::new(&encode_avro_int(0)))
4508 .unwrap();
4509 d_u8.decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32)))
4510 .unwrap();
4511 let err_u8_neg = d_u8
4512 .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4513 .unwrap_err();
4514 assert!(err_u8_neg.to_string().contains("out of range for u8"));
4515 let err_u8_high = d_u8
4516 .decode(&mut AvroCursor::new(&encode_avro_int(u8::MAX as i32 + 1)))
4517 .unwrap_err();
4518 assert!(err_u8_high.to_string().contains("out of range for u8"));
4519 let arr_u8 = d_u8.flush(None).unwrap();
4520 let values_u8 = arr_u8.as_any().downcast_ref::<UInt8Array>().unwrap();
4521 assert_eq!(values_u8.values(), &[0, u8::MAX]);
4522
4523 let mut d_u16 = Decoder::try_new(&avro_from_codec(Codec::UInt16)).unwrap();
4524 d_u16
4525 .decode(&mut AvroCursor::new(&encode_avro_int(0)))
4526 .unwrap();
4527 d_u16
4528 .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32)))
4529 .unwrap();
4530 let err_u16_neg = d_u16
4531 .decode(&mut AvroCursor::new(&encode_avro_int(-1)))
4532 .unwrap_err();
4533 assert!(err_u16_neg.to_string().contains("out of range for u16"));
4534 let err_u16_high = d_u16
4535 .decode(&mut AvroCursor::new(&encode_avro_int(u16::MAX as i32 + 1)))
4536 .unwrap_err();
4537 assert!(err_u16_high.to_string().contains("out of range for u16"));
4538 let arr_u16 = d_u16.flush(None).unwrap();
4539 let values_u16 = arr_u16.as_any().downcast_ref::<UInt16Array>().unwrap();
4540 assert_eq!(values_u16.values(), &[0, u16::MAX]);
4541
4542 let mut d_u32 = Decoder::try_new(&avro_from_codec(Codec::UInt32)).unwrap();
4543 d_u32
4544 .decode(&mut AvroCursor::new(&encode_avro_long(0)))
4545 .unwrap();
4546 d_u32
4547 .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64)))
4548 .unwrap();
4549 let err_u32_neg = d_u32
4550 .decode(&mut AvroCursor::new(&encode_avro_long(-1)))
4551 .unwrap_err();
4552 assert!(err_u32_neg.to_string().contains("out of range for u32"));
4553 let err_u32_high = d_u32
4554 .decode(&mut AvroCursor::new(&encode_avro_long(u32::MAX as i64 + 1)))
4555 .unwrap_err();
4556 assert!(err_u32_high.to_string().contains("out of range for u32"));
4557 let arr_u32 = d_u32.flush(None).unwrap();
4558 let values_u32 = arr_u32.as_any().downcast_ref::<UInt32Array>().unwrap();
4559 assert_eq!(values_u32.values(), &[0, u32::MAX]);
4560 }
4561
4562 #[test]
4563 fn test_default_append_int32_and_int64_from_int_and_long() {
4564 let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4565 d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
4566 let arr = d_i32.flush(None).unwrap();
4567 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4568 assert_eq!(a.len(), 1);
4569 assert_eq!(a.value(0), 42);
4570 let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
4571 d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
4572 d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
4573 let arr64 = d_i64.flush(None).unwrap();
4574 let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
4575 assert_eq!(a64.len(), 2);
4576 assert_eq!(a64.value(0), 5);
4577 assert_eq!(a64.value(1), 7);
4578 }
4579
4580 #[test]
4581 fn test_default_append_floats_and_doubles() {
4582 let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
4583 d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
4584 let arr32 = d_f32.flush(None).unwrap();
4585 let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
4586 assert_eq!(a.value(0), 1.5);
4587 let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4588 d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
4589 let arr64 = d_f64.flush(None).unwrap();
4590 let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
4591 assert_eq!(b.value(0), 2.25);
4592 }
4593
4594 #[test]
4595 fn test_default_append_string_and_bytes() {
4596 let mut d_str = Decoder::String(
4597 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4598 Vec::with_capacity(DEFAULT_CAPACITY),
4599 );
4600 d_str
4601 .append_default(&AvroLiteral::String("hi".into()))
4602 .unwrap();
4603 let s_arr = d_str.flush(None).unwrap();
4604 let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
4605 assert_eq!(arr.value(0), "hi");
4606 let mut d_bytes = Decoder::Binary(
4607 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4608 Vec::with_capacity(DEFAULT_CAPACITY),
4609 );
4610 d_bytes
4611 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4612 .unwrap();
4613 let b_arr = d_bytes.flush(None).unwrap();
4614 let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
4615 assert_eq!(barr.value(0), &[1, 2, 3]);
4616 let mut d_str_err = Decoder::String(
4617 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4618 Vec::with_capacity(DEFAULT_CAPACITY),
4619 );
4620 let err = d_str_err
4621 .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
4622 .unwrap_err();
4623 assert!(
4624 err.to_string()
4625 .contains("Default for string must be string"),
4626 "unexpected error: {err:?}"
4627 );
4628 }
4629
4630 #[test]
4631 fn test_default_append_nullable_int32_null_and_value() {
4632 let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4633 let mut dec = Decoder::Nullable(
4634 Nullability::NullFirst,
4635 NullBufferBuilder::new(DEFAULT_CAPACITY),
4636 Box::new(inner),
4637 NullablePlan::ReadTag,
4638 );
4639 dec.append_default(&AvroLiteral::Null).unwrap();
4640 dec.append_default(&AvroLiteral::Int(11)).unwrap();
4641 let arr = dec.flush(None).unwrap();
4642 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4643 assert_eq!(a.len(), 2);
4644 assert!(a.is_null(0));
4645 assert_eq!(a.value(1), 11);
4646 }
4647
4648 #[test]
4649 fn test_default_append_array_of_ints() {
4650 let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4651 let mut d = Decoder::try_new(&list_dt).unwrap();
4652 let items = vec![
4653 AvroLiteral::Int(1),
4654 AvroLiteral::Int(2),
4655 AvroLiteral::Int(3),
4656 ];
4657 d.append_default(&AvroLiteral::Array(items)).unwrap();
4658 let arr = d.flush(None).unwrap();
4659 let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4660 assert_eq!(list.len(), 1);
4661 assert_eq!(list.value_length(0), 3);
4662 let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4663 assert_eq!(vals.values(), &[1, 2, 3]);
4664 }
4665
4666 #[test]
4667 fn test_default_append_map_string_to_int() {
4668 let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4669 let mut d = Decoder::try_new(&map_dt).unwrap();
4670 let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4671 m.insert("k1".to_string(), AvroLiteral::Int(10));
4672 m.insert("k2".to_string(), AvroLiteral::Int(20));
4673 d.append_default(&AvroLiteral::Map(m)).unwrap();
4674 let arr = d.flush(None).unwrap();
4675 let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4676 assert_eq!(map.len(), 1);
4677 assert_eq!(map.value_length(0), 2);
4678 let binding = map.value(0);
4679 let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4680 let k = entries
4681 .column_by_name("key")
4682 .unwrap()
4683 .as_any()
4684 .downcast_ref::<StringArray>()
4685 .unwrap();
4686 let v = entries
4687 .column_by_name("value")
4688 .unwrap()
4689 .as_any()
4690 .downcast_ref::<Int32Array>()
4691 .unwrap();
4692 let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4693 assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4694 let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4695 assert_eq!(vals, [10, 20].into_iter().collect());
4696 }
4697
4698 #[test]
4699 fn test_default_append_enum_by_symbol() {
4700 let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4701 let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4702 d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4703 let arr = d.flush(None).unwrap();
4704 let dict = arr
4705 .as_any()
4706 .downcast_ref::<DictionaryArray<Int32Type>>()
4707 .unwrap();
4708 assert_eq!(dict.len(), 1);
4709 let expected = Int32Array::from(vec![1]);
4710 assert_eq!(dict.keys(), &expected);
4711 let values = dict
4712 .values()
4713 .as_any()
4714 .downcast_ref::<StringArray>()
4715 .unwrap();
4716 assert_eq!(values.value(1), "B");
4717 }
4718
4719 #[test]
4720 fn test_default_append_uuid_and_type_error() {
4721 let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4722 let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4723 d.append_default(&AvroLiteral::String(uuid_str.into()))
4724 .unwrap();
4725 let arr_ref = d.flush(None).unwrap();
4726 let arr = arr_ref
4727 .as_any()
4728 .downcast_ref::<FixedSizeBinaryArray>()
4729 .unwrap();
4730 assert_eq!(arr.value_length(), 16);
4731 assert_eq!(arr.len(), 1);
4732 let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4733 let err = d2
4734 .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4735 .unwrap_err();
4736 assert!(
4737 err.to_string().contains("Default for uuid must be string"),
4738 "unexpected error: {err:?}"
4739 );
4740 }
4741
4742 #[test]
4743 fn test_default_append_fixed_and_length_mismatch() {
4744 let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4745 d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4746 .unwrap();
4747 let arr_ref = d.flush(None).unwrap();
4748 let arr = arr_ref
4749 .as_any()
4750 .downcast_ref::<FixedSizeBinaryArray>()
4751 .unwrap();
4752 assert_eq!(arr.value_length(), 4);
4753 assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4754 let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4755 let err = d_err
4756 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4757 .unwrap_err();
4758 assert!(
4759 err.to_string().contains("Fixed default length"),
4760 "unexpected error: {err:?}"
4761 );
4762 }
4763
4764 #[test]
4765 fn test_default_append_duration_and_length_validation() {
4766 let dt = avro_from_codec(Codec::Interval);
4767 let mut d = Decoder::try_new(&dt).unwrap();
4768 let mut bytes = Vec::with_capacity(12);
4769 bytes.extend_from_slice(&1u32.to_le_bytes());
4770 bytes.extend_from_slice(&2u32.to_le_bytes());
4771 bytes.extend_from_slice(&3u32.to_le_bytes());
4772 d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4773 let arr_ref = d.flush(None).unwrap();
4774 let arr = arr_ref
4775 .as_any()
4776 .downcast_ref::<IntervalMonthDayNanoArray>()
4777 .unwrap();
4778 assert_eq!(arr.len(), 1);
4779 let v = arr.value(0);
4780 assert_eq!(v.months, 1);
4781 assert_eq!(v.days, 2);
4782 assert_eq!(v.nanoseconds, 3_000_000);
4783 let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4784 let err = d_err
4785 .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4786 .unwrap_err();
4787 assert!(
4788 err.to_string()
4789 .contains("Duration default must be exactly 12 bytes"),
4790 "unexpected error: {err:?}"
4791 );
4792 }
4793
4794 #[test]
4795 fn test_default_append_decimal256_from_bytes() {
4796 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4797 let mut d = Decoder::try_new(&dt).unwrap();
4798 let pos: [u8; 32] = [
4799 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4800 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4801 0x00, 0x00, 0x30, 0x39,
4802 ];
4803 d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4804 let neg: [u8; 32] = [
4805 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4806 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4807 0xFF, 0xFF, 0xFF, 0x85,
4808 ];
4809 d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4810 let arr = d.flush(None).unwrap();
4811 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4812 assert_eq!(dec.len(), 2);
4813 assert_eq!(dec.value_as_string(0), "123.45");
4814 assert_eq!(dec.value_as_string(1), "-1.23");
4815 }
4816
4817 #[test]
4818 fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4819 let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4820 let mut rec = make_record_decoder_with_projector_defaults(
4821 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4822 field_defaults,
4823 vec![],
4824 0,
4825 );
4826 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4827 map.insert("a".to_string(), AvroLiteral::Int(7));
4828 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4829 let arr = rec.flush(None).unwrap();
4830 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4831 let a = s
4832 .column_by_name("a")
4833 .unwrap()
4834 .as_any()
4835 .downcast_ref::<Int32Array>()
4836 .unwrap();
4837 let b = s
4838 .column_by_name("b")
4839 .unwrap()
4840 .as_any()
4841 .downcast_ref::<StringArray>()
4842 .unwrap();
4843 assert_eq!(a.value(0), 7);
4844 assert_eq!(b.value(0), "hi");
4845 }
4846
4847 #[test]
4848 fn test_record_append_default_null_uses_projector_field_defaults() {
4849 let field_defaults = vec![
4850 Some(AvroLiteral::Int(5)),
4851 Some(AvroLiteral::String("x".into())),
4852 ];
4853 let mut rec = make_record_decoder_with_projector_defaults(
4854 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4855 field_defaults,
4856 vec![],
4857 0,
4858 );
4859 rec.append_default(&AvroLiteral::Null).unwrap();
4860 let arr = rec.flush(None).unwrap();
4861 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4862 let a = s
4863 .column_by_name("a")
4864 .unwrap()
4865 .as_any()
4866 .downcast_ref::<Int32Array>()
4867 .unwrap();
4868 let b = s
4869 .column_by_name("b")
4870 .unwrap()
4871 .as_any()
4872 .downcast_ref::<StringArray>()
4873 .unwrap();
4874 assert_eq!(a.value(0), 5);
4875 assert_eq!(b.value(0), "x");
4876 }
4877
4878 #[test]
4879 fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
4880 {
4881 let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
4882 let mut field_refs: Vec<FieldRef> = Vec::new();
4883 let mut encoders: Vec<Decoder> = Vec::new();
4884 for (name, dt, nullable) in &fields {
4885 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4886 }
4887 let enc_a = Decoder::Nullable(
4888 Nullability::NullSecond,
4889 NullBufferBuilder::new(DEFAULT_CAPACITY),
4890 Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
4891 NullablePlan::ReadTag,
4892 );
4893 let enc_b = Decoder::Nullable(
4894 Nullability::NullSecond,
4895 NullBufferBuilder::new(DEFAULT_CAPACITY),
4896 Box::new(Decoder::String(
4897 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4898 Vec::with_capacity(DEFAULT_CAPACITY),
4899 )),
4900 NullablePlan::ReadTag,
4901 );
4902 encoders.push(enc_a);
4903 encoders.push(enc_b);
4904 let projector = Projector {
4905 writer_to_reader: Arc::from(vec![]),
4906 skip_decoders: vec![],
4907 field_defaults: vec![None, None], default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4909 };
4910 let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
4911 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4912 map.insert("a".to_string(), AvroLiteral::Int(9));
4913 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4914 let arr = rec.flush(None).unwrap();
4915 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4916 let a = s
4917 .column_by_name("a")
4918 .unwrap()
4919 .as_any()
4920 .downcast_ref::<Int32Array>()
4921 .unwrap();
4922 let b = s
4923 .column_by_name("b")
4924 .unwrap()
4925 .as_any()
4926 .downcast_ref::<StringArray>()
4927 .unwrap();
4928 assert!(a.is_valid(0));
4929 assert_eq!(a.value(0), 9);
4930 assert!(b.is_null(0));
4931 }
4932
4933 #[test]
4934 fn test_projector_default_injection_when_writer_lacks_fields() {
4935 let defaults = vec![None, None];
4936 let injections = vec![
4937 (0, AvroLiteral::Int(99)),
4938 (1, AvroLiteral::String("alice".into())),
4939 ];
4940 let mut rec = make_record_decoder_with_projector_defaults(
4941 &[
4942 ("id", DataType::Int32, false),
4943 ("name", DataType::Utf8, false),
4944 ],
4945 defaults,
4946 injections,
4947 0,
4948 );
4949 rec.decode(&mut AvroCursor::new(&[])).unwrap();
4950 let arr = rec.flush(None).unwrap();
4951 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4952 let id = s
4953 .column_by_name("id")
4954 .unwrap()
4955 .as_any()
4956 .downcast_ref::<Int32Array>()
4957 .unwrap();
4958 let name = s
4959 .column_by_name("name")
4960 .unwrap()
4961 .as_any()
4962 .downcast_ref::<StringArray>()
4963 .unwrap();
4964 assert_eq!(id.value(0), 99);
4965 assert_eq!(name.value(0), "alice");
4966 }
4967
4968 #[test]
4969 fn union_type_ids_are_not_child_indexes() {
4970 let encodings: Vec<AvroDataType> =
4971 vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4972 let fields: UnionFields = [
4973 (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4974 (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4975 ]
4976 .into_iter()
4977 .collect();
4978 let dt = avro_from_codec(Codec::Union(
4979 encodings.into(),
4980 fields.clone(),
4981 UnionMode::Dense,
4982 ));
4983 let mut dec = Decoder::try_new(&dt).expect("decoder");
4984 let mut b1 = encode_avro_long(1);
4985 b1.extend(encode_avro_bytes("hi".as_bytes()));
4986 dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4987 let mut b0 = encode_avro_long(0);
4988 b0.extend(encode_avro_int(5));
4989 dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4990 let arr = dec.flush(None).expect("flush");
4991 let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4992 assert_eq!(ua.len(), 2);
4993 assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4994 assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4995 assert_eq!(ua.value_offset(0), 0);
4996 assert_eq!(ua.value_offset(1), 0);
4997 let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4998 assert_eq!(utf8_child.len(), 1);
4999 assert_eq!(utf8_child.value(0), "hi");
5000 let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
5001 assert_eq!(int_child.len(), 1);
5002 assert_eq!(int_child.value(0), 5);
5003 let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
5004 assert_eq!(type_ids, vec![42_i8, 7_i8]);
5005 }
5006
5007 #[cfg(feature = "avro_custom_types")]
5008 #[test]
5009 fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
5010 for codec in [
5011 Codec::DurationNanos,
5012 Codec::DurationMicros,
5013 Codec::DurationMillis,
5014 Codec::DurationSeconds,
5015 ] {
5016 let dt = make_avro_dt(codec.clone(), None);
5017 let s = Skipper::from_avro(&dt)?;
5018 match s {
5019 Skipper::Int64 => {}
5020 other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
5021 }
5022 }
5023 Ok(())
5024 }
5025
5026 #[cfg(feature = "avro_custom_types")]
5027 #[test]
5028 fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
5029 let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
5030 for codec in [
5031 Codec::DurationNanos,
5032 Codec::DurationMicros,
5033 Codec::DurationMillis,
5034 Codec::DurationSeconds,
5035 ] {
5036 let dt = make_avro_dt(codec.clone(), None);
5037 let mut s = Skipper::from_avro(&dt)?;
5038 for &v in &values {
5039 let bytes = encode_avro_long(v);
5040 let mut cursor = AvroCursor::new(&bytes);
5041 s.skip(&mut cursor)?;
5042 assert_eq!(
5043 cursor.position(),
5044 bytes.len(),
5045 "did not consume all bytes for {:?} value {}",
5046 codec,
5047 v
5048 );
5049 }
5050 }
5051 Ok(())
5052 }
5053
5054 #[cfg(feature = "avro_custom_types")]
5055 #[test]
5056 fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
5057 let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
5058 let mut s = Skipper::from_avro(&dt)?;
5059 match &s {
5060 Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
5061 Skipper::Int64 => {}
5062 ref other => panic!("expected inner Int64, got {:?}", other),
5063 },
5064 other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
5065 }
5066 {
5067 let buf = encode_vlq_u64(0);
5068 let mut cursor = AvroCursor::new(&buf);
5069 s.skip(&mut cursor)?;
5070 assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
5071 }
5072 {
5073 let mut buf = encode_vlq_u64(1);
5074 buf.extend(encode_avro_long(0));
5075 let mut cursor = AvroCursor::new(&buf);
5076 s.skip(&mut cursor)?;
5077 assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
5078 }
5079
5080 Ok(())
5081 }
5082
5083 #[cfg(feature = "avro_custom_types")]
5084 #[test]
5085 fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
5086 let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
5087 let mut s = Skipper::from_avro(&dt)?;
5088 match &s {
5089 Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
5090 Skipper::Int64 => {}
5091 ref other => panic!("expected inner Int64, got {:?}", other),
5092 },
5093 other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
5094 }
5095 {
5096 let buf = encode_vlq_u64(1);
5097 let mut cursor = AvroCursor::new(&buf);
5098 s.skip(&mut cursor)?;
5099 assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
5100 }
5101 {
5102 let mut buf = encode_vlq_u64(0);
5103 buf.extend(encode_avro_long(-1));
5104 let mut cursor = AvroCursor::new(&buf);
5105 s.skip(&mut cursor)?;
5106 assert_eq!(
5107 cursor.position(),
5108 1 + encode_avro_long(-1).len(),
5109 "expected to consume tag=0 + long(-1)"
5110 );
5111 }
5112 Ok(())
5113 }
5114
5115 #[test]
5116 fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
5117 let dt = make_avro_dt(Codec::Interval, None);
5118 let mut s = Skipper::from_avro(&dt)?;
5119 match s {
5120 Skipper::DurationFixed12 => {}
5121 other => panic!("expected DurationFixed12, got {:?}", other),
5122 }
5123 let payload = vec![0u8; 12];
5124 let mut cursor = AvroCursor::new(&payload);
5125 s.skip(&mut cursor)?;
5126 assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
5127 Ok(())
5128 }
5129
5130 #[cfg(feature = "avro_custom_types")]
5131 #[test]
5132 fn test_run_end_encoded_width16_int32_basic_grouping() {
5133 use arrow_array::RunArray;
5134 use std::sync::Arc;
5135 let inner = avro_from_codec(Codec::Int32);
5136 let ree = AvroDataType::new(
5137 Codec::RunEndEncoded(Arc::new(inner), 16),
5138 Default::default(),
5139 None,
5140 );
5141 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5142 for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
5143 let bytes = encode_avro_int(v);
5144 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5145 }
5146 let arr = dec.flush(None).expect("flush");
5147 let ra = arr
5148 .as_any()
5149 .downcast_ref::<RunArray<Int16Type>>()
5150 .expect("RunArray<Int16Type>");
5151 assert_eq!(ra.len(), 9);
5152 assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
5153 let vals = ra
5154 .values()
5155 .as_ref()
5156 .as_any()
5157 .downcast_ref::<Int32Array>()
5158 .expect("values Int32");
5159 assert_eq!(vals.values(), &[1, 2, 3]);
5160 }
5161
5162 #[cfg(feature = "avro_custom_types")]
5163 #[test]
5164 fn test_run_end_encoded_width32_nullable_values_group_nulls() {
5165 use arrow_array::RunArray;
5166 use std::sync::Arc;
5167 let inner = AvroDataType::new(
5168 Codec::Int32,
5169 Default::default(),
5170 Some(Nullability::NullSecond),
5171 );
5172 let ree = AvroDataType::new(
5173 Codec::RunEndEncoded(Arc::new(inner), 32),
5174 Default::default(),
5175 None,
5176 );
5177 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
5178 let seq: [Option<i32>; 8] = [
5179 None,
5180 None,
5181 Some(7),
5182 Some(7),
5183 Some(7),
5184 None,
5185 Some(5),
5186 Some(5),
5187 ];
5188 for item in seq {
5189 let mut bytes = Vec::new();
5190 match item {
5191 None => bytes.extend_from_slice(&encode_vlq_u64(1)),
5192 Some(v) => {
5193 bytes.extend_from_slice(&encode_vlq_u64(0));
5194 bytes.extend_from_slice(&encode_avro_int(v));
5195 }
5196 }
5197 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5198 }
5199 let arr = dec.flush(None).expect("flush");
5200 let ra = arr
5201 .as_any()
5202 .downcast_ref::<RunArray<Int32Type>>()
5203 .expect("RunArray<Int32Type>");
5204 assert_eq!(ra.len(), 8);
5205 assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
5206 let vals = ra
5207 .values()
5208 .as_ref()
5209 .as_any()
5210 .downcast_ref::<Int32Array>()
5211 .expect("values Int32 (nullable)");
5212 assert_eq!(vals.len(), 4);
5213 assert!(vals.is_null(0));
5214 assert_eq!(vals.value(1), 7);
5215 assert!(vals.is_null(2));
5216 assert_eq!(vals.value(3), 5);
5217 }
5218
5219 #[cfg(feature = "avro_custom_types")]
5220 #[test]
5221 fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
5222 use arrow_array::RunArray;
5223 let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
5224 let ree = Decoder::RunEndEncoded(
5225 8, 0,
5227 Box::new(inner_values),
5228 );
5229 let mut dec = Decoder::Nullable(
5230 Nullability::NullSecond,
5231 NullBufferBuilder::new(DEFAULT_CAPACITY),
5232 Box::new(ree),
5233 NullablePlan::FromSingle {
5234 promotion: Promotion::IntToDouble,
5235 },
5236 );
5237 for v in [1, 1, 2, 2, 2] {
5238 let bytes = encode_avro_int(v);
5239 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5240 }
5241 let arr = dec.flush(None).expect("flush");
5242 let ra = arr
5243 .as_any()
5244 .downcast_ref::<RunArray<Int64Type>>()
5245 .expect("RunArray<Int64Type>");
5246 assert_eq!(ra.len(), 5);
5247 assert_eq!(ra.run_ends().values(), &[2, 5]);
5248 let vals = ra
5249 .values()
5250 .as_ref()
5251 .as_any()
5252 .downcast_ref::<Float64Array>()
5253 .expect("values Float64");
5254 assert_eq!(vals.values(), &[1.0, 2.0]);
5255 }
5256
5257 #[cfg(feature = "avro_custom_types")]
5258 #[test]
5259 fn test_run_end_encoded_unsupported_run_end_width_errors() {
5260 use std::sync::Arc;
5261 let inner = avro_from_codec(Codec::Int32);
5262 let dt = AvroDataType::new(
5263 Codec::RunEndEncoded(Arc::new(inner), 3),
5264 Default::default(),
5265 None,
5266 );
5267 let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
5268 let msg = err.to_string();
5269 assert!(
5270 msg.contains("Unsupported run-end width")
5271 && msg.contains("16/32/64 bits or 2/4/8 bytes"),
5272 "unexpected error message: {msg}"
5273 );
5274 }
5275
5276 #[cfg(feature = "avro_custom_types")]
5277 #[test]
5278 fn test_run_end_encoded_empty_input_is_empty_runarray() {
5279 use arrow_array::RunArray;
5280 use std::sync::Arc;
5281 let inner = avro_from_codec(Codec::Utf8);
5282 let dt = AvroDataType::new(
5283 Codec::RunEndEncoded(Arc::new(inner), 4),
5284 Default::default(),
5285 None,
5286 );
5287 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5288 let arr = dec.flush(None).expect("flush");
5289 let ra = arr
5290 .as_any()
5291 .downcast_ref::<RunArray<Int32Type>>()
5292 .expect("RunArray<Int32Type>");
5293 assert_eq!(ra.len(), 0);
5294 assert_eq!(ra.run_ends().len(), 0);
5295 assert_eq!(ra.values().len(), 0);
5296 }
5297
5298 #[cfg(feature = "avro_custom_types")]
5299 #[test]
5300 fn test_run_end_encoded_strings_grouping_width32_bits() {
5301 use arrow_array::RunArray;
5302 use std::sync::Arc;
5303 let inner = avro_from_codec(Codec::Utf8);
5304 let dt = AvroDataType::new(
5305 Codec::RunEndEncoded(Arc::new(inner), 32),
5306 Default::default(),
5307 None,
5308 );
5309 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
5310 for s in ["a", "a", "bb", "bb", "bb", "a"] {
5311 let bytes = encode_avro_bytes(s.as_bytes());
5312 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5313 }
5314 let arr = dec.flush(None).expect("flush");
5315 let ra = arr
5316 .as_any()
5317 .downcast_ref::<RunArray<Int32Type>>()
5318 .expect("RunArray<Int32Type>");
5319 assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
5320 let vals = ra
5321 .values()
5322 .as_ref()
5323 .as_any()
5324 .downcast_ref::<StringArray>()
5325 .expect("values String");
5326 assert_eq!(vals.len(), 3);
5327 assert_eq!(vals.value(0), "a");
5328 assert_eq!(vals.value(1), "bb");
5329 assert_eq!(vals.value(2), "a");
5330 }
5331
5332 #[cfg(not(feature = "avro_custom_types"))]
5333 #[test]
5334 fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
5335 let dt = avro_from_codec(Codec::Int32);
5336 let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
5337 for v in [1, 2, 3] {
5338 let bytes = encode_avro_int(v);
5339 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
5340 }
5341 let arr = dec.flush(None).expect("flush");
5342 let a = arr
5343 .as_any()
5344 .downcast_ref::<Int32Array>()
5345 .expect("Int32Array");
5346 assert_eq!(a.values(), &[1, 2, 3]);
5347 }
5348
5349 #[test]
5350 fn test_timestamp_nanos_decoding_utc() {
5351 let avro_type = avro_from_codec(Codec::TimestampNanos(true));
5352 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5353 let mut data = Vec::new();
5354 for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
5355 data.extend_from_slice(&encode_avro_long(v));
5356 }
5357 let mut cur = AvroCursor::new(&data);
5358 for _ in 0..4 {
5359 decoder.decode(&mut cur).expect("decode nanos ts");
5360 }
5361 let array = decoder.flush(None).expect("flush nanos ts");
5362 let ts = array
5363 .as_any()
5364 .downcast_ref::<TimestampNanosecondArray>()
5365 .expect("TimestampNanosecondArray");
5366 assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
5367 match ts.data_type() {
5368 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5369 assert_eq!(tz.as_deref(), Some("+00:00"));
5370 }
5371 other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
5372 }
5373 }
5374
5375 #[test]
5376 fn test_timestamp_nanos_decoding_local() {
5377 let avro_type = avro_from_codec(Codec::TimestampNanos(false));
5378 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
5379 let mut data = Vec::new();
5380 for v in [10_i64, 20_i64, -30_i64] {
5381 data.extend_from_slice(&encode_avro_long(v));
5382 }
5383 let mut cur = AvroCursor::new(&data);
5384 for _ in 0..3 {
5385 decoder.decode(&mut cur).expect("decode nanos ts");
5386 }
5387 let array = decoder.flush(None).expect("flush nanos ts");
5388 let ts = array
5389 .as_any()
5390 .downcast_ref::<TimestampNanosecondArray>()
5391 .expect("TimestampNanosecondArray");
5392 assert_eq!(ts.values(), &[10, 20, -30]);
5393 match ts.data_type() {
5394 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5395 assert_eq!(tz.as_deref(), None);
5396 }
5397 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5398 }
5399 }
5400
5401 #[test]
5402 fn test_timestamp_nanos_decoding_with_nulls() {
5403 let avro_type = AvroDataType::new(
5404 Codec::TimestampNanos(false),
5405 Default::default(),
5406 Some(Nullability::NullFirst),
5407 );
5408 let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
5409 let mut data = Vec::new();
5410 data.extend_from_slice(&encode_avro_long(1));
5411 data.extend_from_slice(&encode_avro_long(42));
5412 data.extend_from_slice(&encode_avro_long(0));
5413 data.extend_from_slice(&encode_avro_long(1));
5414 data.extend_from_slice(&encode_avro_long(-7));
5415 let mut cur = AvroCursor::new(&data);
5416 for _ in 0..3 {
5417 decoder.decode(&mut cur).expect("decode nullable nanos ts");
5418 }
5419 let array = decoder.flush(None).expect("flush nullable nanos ts");
5420 let ts = array
5421 .as_any()
5422 .downcast_ref::<TimestampNanosecondArray>()
5423 .expect("TimestampNanosecondArray");
5424 assert_eq!(ts.len(), 3);
5425 assert!(ts.is_valid(0));
5426 assert!(ts.is_null(1));
5427 assert!(ts.is_valid(2));
5428 assert_eq!(ts.value(0), 42);
5429 assert_eq!(ts.value(2), -7);
5430 match ts.data_type() {
5431 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
5432 assert_eq!(tz.as_deref(), None);
5433 }
5434 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
5435 }
5436 }
5437}