1use crate::codec::{
21 AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
22 ResolvedUnion,
23};
24use crate::errors::AvroError;
25use crate::reader::cursor::AvroCursor;
26use crate::schema::Nullability;
27#[cfg(feature = "small_decimals")]
28use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
29use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
30use arrow_array::types::*;
31use arrow_array::*;
32use arrow_buffer::*;
33#[cfg(feature = "small_decimals")]
34use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
35use arrow_schema::{
36 DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField, FieldRef,
37 Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
38};
39#[cfg(feature = "avro_custom_types")]
40use arrow_select::take::{TakeOptions, take};
41use std::cmp::Ordering;
42use std::sync::Arc;
43use strum_macros::AsRefStr;
44use uuid::Uuid;
45
46const DEFAULT_CAPACITY: usize = 1024;
47
48#[derive(Clone, Copy, Debug)]
50enum NullablePlan {
51 ReadTag,
53 FromSingle { promotion: Promotion },
56}
57
58macro_rules! decode_decimal {
60 ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
61 let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
62 $builder.append_value(<$Int>::from_be_bytes(bytes));
63 }};
64}
65
66macro_rules! flush_decimal {
68 ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
69 let (_, vals, _) = $builder.finish().into_parts();
70 let dec = <$ArrayTy>::try_new(vals, $nulls)?
71 .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)?;
72 Arc::new(dec) as ArrayRef
73 }};
74}
75
76macro_rules! append_decimal_default {
79 ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
80 match $lit {
81 AvroLiteral::Bytes(b) => {
82 let ext = sign_cast_to::<$N>(b)?;
83 let val = <$Int>::from_be_bytes(ext);
84 $builder.append_value(val);
85 Ok(())
86 }
87 _ => Err(AvroError::InvalidArgument(
88 concat!(
89 "Default for ",
90 $name,
91 " must be bytes (two's-complement big-endian)"
92 )
93 .to_string(),
94 )),
95 }
96 }};
97}
98
99#[derive(Debug)]
101pub(crate) struct RecordDecoder {
102 schema: SchemaRef,
103 fields: Vec<Decoder>,
104 projector: Option<Projector>,
105}
106
107impl RecordDecoder {
108 pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, AvroError> {
119 match data_type.codec() {
120 Codec::Struct(reader_fields) => {
121 let mut arrow_fields = Vec::with_capacity(reader_fields.len());
123 let mut encodings = Vec::with_capacity(reader_fields.len());
124 for avro_field in reader_fields.iter() {
125 arrow_fields.push(avro_field.field());
126 encodings.push(Decoder::try_new(avro_field.data_type())?);
127 }
128 let projector = match data_type.resolution.as_ref() {
129 Some(ResolutionInfo::Record(rec)) => {
130 Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
131 }
132 _ => None,
133 };
134 Ok(Self {
135 schema: Arc::new(ArrowSchema::new(arrow_fields)),
136 fields: encodings,
137 projector,
138 })
139 }
140 other => Err(AvroError::ParseError(format!(
141 "Expected record got {other:?}"
142 ))),
143 }
144 }
145
146 pub(crate) fn schema(&self) -> &SchemaRef {
148 &self.schema
149 }
150
151 pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, AvroError> {
153 let mut cursor = AvroCursor::new(buf);
154 match self.projector.as_mut() {
155 Some(proj) => {
156 for _ in 0..count {
157 proj.project_record(&mut cursor, &mut self.fields)?;
158 }
159 }
160 None => {
161 for _ in 0..count {
162 for field in &mut self.fields {
163 field.decode(&mut cursor)?;
164 }
165 }
166 }
167 }
168 Ok(cursor.position())
169 }
170
171 pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
173 let arrays = self
174 .fields
175 .iter_mut()
176 .map(|x| x.flush(None))
177 .collect::<Result<Vec<_>, _>>()?;
178 RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
179 }
180}
181
182#[derive(Debug)]
183struct EnumResolution {
184 mapping: Arc<[i32]>,
185 default_index: i32,
186}
187
188#[derive(Debug, AsRefStr)]
189enum Decoder {
190 Null(usize),
191 Boolean(BooleanBufferBuilder),
192 Int32(Vec<i32>),
193 Int64(Vec<i64>),
194 #[cfg(feature = "avro_custom_types")]
195 DurationSecond(Vec<i64>),
196 #[cfg(feature = "avro_custom_types")]
197 DurationMillisecond(Vec<i64>),
198 #[cfg(feature = "avro_custom_types")]
199 DurationMicrosecond(Vec<i64>),
200 #[cfg(feature = "avro_custom_types")]
201 DurationNanosecond(Vec<i64>),
202 Float32(Vec<f32>),
203 Float64(Vec<f64>),
204 Date32(Vec<i32>),
205 TimeMillis(Vec<i32>),
206 TimeMicros(Vec<i64>),
207 TimestampMillis(bool, Vec<i64>),
208 TimestampMicros(bool, Vec<i64>),
209 TimestampNanos(bool, Vec<i64>),
210 Int32ToInt64(Vec<i64>),
211 Int32ToFloat32(Vec<f32>),
212 Int32ToFloat64(Vec<f64>),
213 Int64ToFloat32(Vec<f32>),
214 Int64ToFloat64(Vec<f64>),
215 Float32ToFloat64(Vec<f64>),
216 BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
217 StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
218 Binary(OffsetBufferBuilder<i32>, Vec<u8>),
219 String(OffsetBufferBuilder<i32>, Vec<u8>),
221 StringView(OffsetBufferBuilder<i32>, Vec<u8>),
223 Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
224 Record(Fields, Vec<Decoder>, Option<Projector>),
225 Map(
226 FieldRef,
227 OffsetBufferBuilder<i32>,
228 OffsetBufferBuilder<i32>,
229 Vec<u8>,
230 Box<Decoder>,
231 ),
232 Fixed(i32, Vec<u8>),
233 Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
234 Duration(IntervalMonthDayNanoBuilder),
235 Uuid(Vec<u8>),
236 #[cfg(feature = "small_decimals")]
237 Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
238 #[cfg(feature = "small_decimals")]
239 Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
240 Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
241 Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
242 #[cfg(feature = "avro_custom_types")]
243 RunEndEncoded(u8, usize, Box<Decoder>),
244 Union(UnionDecoder),
245 Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
246}
247
248impl Decoder {
249 fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
250 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
251 if info.writer_is_union && !info.reader_is_union {
252 let mut clone = data_type.clone();
253 clone.resolution = None; let target = Box::new(Self::try_new_internal(&clone)?);
255 let decoder = Self::Union(
256 UnionDecoderBuilder::new()
257 .with_resolved_union(info.clone())
258 .with_target(target)
259 .build()?,
260 );
261 return Ok(decoder);
262 }
263 }
264 Self::try_new_internal(data_type)
265 }
266
267 fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
268 let promotion = match data_type.resolution.as_ref() {
270 Some(ResolutionInfo::Promotion(p)) => Some(p),
271 _ => None,
272 };
273 let decoder = match (data_type.codec(), promotion) {
274 (Codec::Int64, Some(Promotion::IntToLong)) => {
275 Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
276 }
277 (Codec::Float32, Some(Promotion::IntToFloat)) => {
278 Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
279 }
280 (Codec::Float64, Some(Promotion::IntToDouble)) => {
281 Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
282 }
283 (Codec::Float32, Some(Promotion::LongToFloat)) => {
284 Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
285 }
286 (Codec::Float64, Some(Promotion::LongToDouble)) => {
287 Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
288 }
289 (Codec::Float64, Some(Promotion::FloatToDouble)) => {
290 Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
291 }
292 (Codec::Utf8, Some(Promotion::BytesToString))
293 | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
294 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
295 Vec::with_capacity(DEFAULT_CAPACITY),
296 ),
297 (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
298 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
299 Vec::with_capacity(DEFAULT_CAPACITY),
300 ),
301 (Codec::Null, _) => Self::Null(0),
302 (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
303 (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
304 (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
305 (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
306 (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
307 (Codec::Binary, _) => Self::Binary(
308 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
309 Vec::with_capacity(DEFAULT_CAPACITY),
310 ),
311 (Codec::Utf8, _) => Self::String(
312 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
313 Vec::with_capacity(DEFAULT_CAPACITY),
314 ),
315 (Codec::Utf8View, _) => Self::StringView(
316 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
317 Vec::with_capacity(DEFAULT_CAPACITY),
318 ),
319 (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
320 (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
321 (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
322 (Codec::TimestampMillis(is_utc), _) => {
323 Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
324 }
325 (Codec::TimestampMicros(is_utc), _) => {
326 Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
327 }
328 (Codec::TimestampNanos(is_utc), _) => {
329 Self::TimestampNanos(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
330 }
331 #[cfg(feature = "avro_custom_types")]
332 (Codec::DurationNanos, _) => {
333 Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
334 }
335 #[cfg(feature = "avro_custom_types")]
336 (Codec::DurationMicros, _) => {
337 Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
338 }
339 #[cfg(feature = "avro_custom_types")]
340 (Codec::DurationMillis, _) => {
341 Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
342 }
343 #[cfg(feature = "avro_custom_types")]
344 (Codec::DurationSeconds, _) => {
345 Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
346 }
347 (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
348 (Codec::Decimal(precision, scale, size), _) => {
349 let p = *precision;
350 let s = *scale;
351 let prec = p as u8;
352 let scl = s.unwrap_or(0) as i8;
353 #[cfg(feature = "small_decimals")]
354 {
355 if p <= DECIMAL32_MAX_PRECISION as usize {
356 let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
357 .with_precision_and_scale(prec, scl)?;
358 Self::Decimal32(p, s, *size, builder)
359 } else if p <= DECIMAL64_MAX_PRECISION as usize {
360 let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
361 .with_precision_and_scale(prec, scl)?;
362 Self::Decimal64(p, s, *size, builder)
363 } else if p <= DECIMAL128_MAX_PRECISION as usize {
364 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
365 .with_precision_and_scale(prec, scl)?;
366 Self::Decimal128(p, s, *size, builder)
367 } else if p <= DECIMAL256_MAX_PRECISION as usize {
368 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
369 .with_precision_and_scale(prec, scl)?;
370 Self::Decimal256(p, s, *size, builder)
371 } else {
372 return Err(AvroError::ParseError(format!(
373 "Decimal precision {p} exceeds maximum supported"
374 )));
375 }
376 }
377 #[cfg(not(feature = "small_decimals"))]
378 {
379 if p <= DECIMAL128_MAX_PRECISION as usize {
380 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
381 .with_precision_and_scale(prec, scl)?;
382 Self::Decimal128(p, s, *size, builder)
383 } else if p <= DECIMAL256_MAX_PRECISION as usize {
384 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
385 .with_precision_and_scale(prec, scl)?;
386 Self::Decimal256(p, s, *size, builder)
387 } else {
388 return Err(AvroError::ParseError(format!(
389 "Decimal precision {p} exceeds maximum supported"
390 )));
391 }
392 }
393 }
394 (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
395 (Codec::List(item), _) => {
396 let decoder = Self::try_new(item)?;
397 Self::Array(
398 Arc::new(item.field_with_name("item")),
399 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
400 Box::new(decoder),
401 )
402 }
403 (Codec::Enum(symbols), _) => {
404 let res = match data_type.resolution.as_ref() {
405 Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
406 mapping: mapping.mapping.clone(),
407 default_index: mapping.default_index,
408 }),
409 _ => None,
410 };
411 Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
412 }
413 (Codec::Struct(fields), _) => {
414 let mut arrow_fields = Vec::with_capacity(fields.len());
415 let mut encodings = Vec::with_capacity(fields.len());
416 for avro_field in fields.iter() {
417 let encoding = Self::try_new(avro_field.data_type())?;
418 arrow_fields.push(avro_field.field());
419 encodings.push(encoding);
420 }
421 let projector =
422 if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
423 Some(ProjectorBuilder::try_new(rec, fields).build()?)
424 } else {
425 None
426 };
427 Self::Record(arrow_fields.into(), encodings, projector)
428 }
429 (Codec::Map(child), _) => {
430 let val_field = child.field_with_name("value");
431 let map_field = Arc::new(ArrowField::new(
432 "entries",
433 DataType::Struct(Fields::from(vec![
434 ArrowField::new("key", DataType::Utf8, false),
435 val_field,
436 ])),
437 false,
438 ));
439 let val_dec = Self::try_new(child)?;
440 Self::Map(
441 map_field,
442 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
443 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
444 Vec::with_capacity(DEFAULT_CAPACITY),
445 Box::new(val_dec),
446 )
447 }
448 (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
449 (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
450 let decoders = encodings
451 .iter()
452 .map(Self::try_new_internal)
453 .collect::<Result<Vec<_>, _>>()?;
454 if fields.len() != decoders.len() {
455 return Err(AvroError::SchemaError(format!(
456 "Union has {} fields but {} decoders",
457 fields.len(),
458 decoders.len()
459 )));
460 }
461 let branch_count = decoders.len();
464 let max_addr = (i32::MAX as usize) + 1;
465 if branch_count > max_addr {
466 return Err(AvroError::SchemaError(format!(
467 "Union has {branch_count} branches, which exceeds the maximum addressable \
468 branches by an Avro int tag ({} + 1).",
469 i32::MAX
470 )));
471 }
472 let mut builder = UnionDecoderBuilder::new()
473 .with_fields(fields.clone())
474 .with_branches(decoders);
475 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
476 if info.reader_is_union {
477 builder = builder.with_resolved_union(info.clone());
478 }
479 }
480 Self::Union(builder.build()?)
481 }
482 (Codec::Union(_, _, _), _) => {
483 return Err(AvroError::NYI(
484 "Sparse Arrow unions are not yet supported".to_string(),
485 ));
486 }
487 #[cfg(feature = "avro_custom_types")]
488 (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
489 let inner = Self::try_new(values_dt)?;
490 let byte_width: u8 = match *width_bits_or_bytes {
491 2 | 4 | 8 => *width_bits_or_bytes,
492 16 => 2,
493 32 => 4,
494 64 => 8,
495 other => {
496 return Err(AvroError::InvalidArgument(format!(
497 "Unsupported run-end width {other} for RunEndEncoded; \
498 expected 16/32/64 bits or 2/4/8 bytes"
499 )));
500 }
501 };
502 Self::RunEndEncoded(byte_width, 0, Box::new(inner))
503 }
504 };
505 Ok(match data_type.nullability() {
506 Some(nullability) => {
507 let mut plan = NullablePlan::ReadTag;
509 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
510 if !info.writer_is_union && info.reader_is_union {
511 if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
512 plan = NullablePlan::FromSingle { promotion: *promo };
513 }
514 }
515 }
516 Self::Nullable(
517 nullability,
518 NullBufferBuilder::new(DEFAULT_CAPACITY),
519 Box::new(decoder),
520 plan,
521 )
522 }
523 None => decoder,
524 })
525 }
526
527 fn append_null(&mut self) -> Result<(), AvroError> {
529 match self {
530 Self::Null(count) => *count += 1,
531 Self::Boolean(b) => b.append(false),
532 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
533 Self::Int64(v)
534 | Self::Int32ToInt64(v)
535 | Self::TimeMicros(v)
536 | Self::TimestampMillis(_, v)
537 | Self::TimestampMicros(_, v)
538 | Self::TimestampNanos(_, v) => v.push(0),
539 #[cfg(feature = "avro_custom_types")]
540 Self::DurationSecond(v)
541 | Self::DurationMillisecond(v)
542 | Self::DurationMicrosecond(v)
543 | Self::DurationNanosecond(v) => v.push(0),
544 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
545 Self::Float64(v)
546 | Self::Int32ToFloat64(v)
547 | Self::Int64ToFloat64(v)
548 | Self::Float32ToFloat64(v) => v.push(0.),
549 Self::Binary(offsets, _)
550 | Self::String(offsets, _)
551 | Self::StringView(offsets, _)
552 | Self::BytesToString(offsets, _)
553 | Self::StringToBytes(offsets, _) => {
554 offsets.push_length(0);
555 }
556 Self::Uuid(v) => {
557 v.extend([0; 16]);
558 }
559 Self::Array(_, offsets, _) => {
560 offsets.push_length(0);
561 }
562 Self::Record(_, e, _) => {
563 for encoding in e.iter_mut() {
564 encoding.append_null()?;
565 }
566 }
567 Self::Map(_, _koff, moff, _, _) => {
568 moff.push_length(0);
569 }
570 Self::Fixed(sz, accum) => {
571 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
572 }
573 #[cfg(feature = "small_decimals")]
574 Self::Decimal32(_, _, _, builder) => builder.append_value(0),
575 #[cfg(feature = "small_decimals")]
576 Self::Decimal64(_, _, _, builder) => builder.append_value(0),
577 Self::Decimal128(_, _, _, builder) => builder.append_value(0),
578 Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
579 Self::Enum(indices, _, _) => indices.push(0),
580 Self::Duration(builder) => builder.append_null(),
581 #[cfg(feature = "avro_custom_types")]
582 Self::RunEndEncoded(_, len, inner) => {
583 *len += 1;
584 inner.append_null()?;
585 }
586 Self::Union(u) => u.append_null()?,
587 Self::Nullable(_, null_buffer, inner, _) => {
588 null_buffer.append(false);
589 inner.append_null()?;
590 }
591 }
592 Ok(())
593 }
594
595 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
597 match self {
598 Self::Nullable(_, nb, inner, _) => {
599 if matches!(lit, AvroLiteral::Null) {
600 nb.append(false);
601 inner.append_null()
602 } else {
603 nb.append(true);
604 inner.append_default(lit)
605 }
606 }
607 Self::Null(count) => match lit {
608 AvroLiteral::Null => {
609 *count += 1;
610 Ok(())
611 }
612 _ => Err(AvroError::InvalidArgument(
613 "Non-null default for null type".to_string(),
614 )),
615 },
616 Self::Boolean(b) => match lit {
617 AvroLiteral::Boolean(v) => {
618 b.append(*v);
619 Ok(())
620 }
621 _ => Err(AvroError::InvalidArgument(
622 "Default for boolean must be boolean".to_string(),
623 )),
624 },
625 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
626 AvroLiteral::Int(i) => {
627 v.push(*i);
628 Ok(())
629 }
630 _ => Err(AvroError::InvalidArgument(
631 "Default for int32/date32/time-millis must be int".to_string(),
632 )),
633 },
634 #[cfg(feature = "avro_custom_types")]
635 Self::DurationSecond(v)
636 | Self::DurationMillisecond(v)
637 | Self::DurationMicrosecond(v)
638 | Self::DurationNanosecond(v) => match lit {
639 AvroLiteral::Long(i) => {
640 v.push(*i);
641 Ok(())
642 }
643 _ => Err(AvroError::InvalidArgument(
644 "Default for duration long must be long".to_string(),
645 )),
646 },
647 Self::Int64(v)
648 | Self::Int32ToInt64(v)
649 | Self::TimeMicros(v)
650 | Self::TimestampMillis(_, v)
651 | Self::TimestampMicros(_, v)
652 | Self::TimestampNanos(_, v) => match lit {
653 AvroLiteral::Long(i) => {
654 v.push(*i);
655 Ok(())
656 }
657 AvroLiteral::Int(i) => {
658 v.push(*i as i64);
659 Ok(())
660 }
661 _ => Err(AvroError::InvalidArgument(
662 "Default for long/time-micros/timestamp must be long or int".to_string(),
663 )),
664 },
665 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
666 AvroLiteral::Float(f) => {
667 v.push(*f);
668 Ok(())
669 }
670 _ => Err(AvroError::InvalidArgument(
671 "Default for float must be float".to_string(),
672 )),
673 },
674 Self::Float64(v)
675 | Self::Int32ToFloat64(v)
676 | Self::Int64ToFloat64(v)
677 | Self::Float32ToFloat64(v) => match lit {
678 AvroLiteral::Double(f) => {
679 v.push(*f);
680 Ok(())
681 }
682 _ => Err(AvroError::InvalidArgument(
683 "Default for double must be double".to_string(),
684 )),
685 },
686 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
687 AvroLiteral::Bytes(b) => {
688 offsets.push_length(b.len());
689 values.extend_from_slice(b);
690 Ok(())
691 }
692 _ => Err(AvroError::InvalidArgument(
693 "Default for bytes must be bytes".to_string(),
694 )),
695 },
696 Self::BytesToString(offsets, values)
697 | Self::String(offsets, values)
698 | Self::StringView(offsets, values) => match lit {
699 AvroLiteral::String(s) => {
700 let b = s.as_bytes();
701 offsets.push_length(b.len());
702 values.extend_from_slice(b);
703 Ok(())
704 }
705 _ => Err(AvroError::InvalidArgument(
706 "Default for string must be string".to_string(),
707 )),
708 },
709 Self::Uuid(values) => match lit {
710 AvroLiteral::String(s) => {
711 let uuid = Uuid::try_parse(s).map_err(|e| {
712 AvroError::InvalidArgument(format!("Invalid UUID default: {s} ({e})"))
713 })?;
714 values.extend_from_slice(uuid.as_bytes());
715 Ok(())
716 }
717 _ => Err(AvroError::InvalidArgument(
718 "Default for uuid must be string".to_string(),
719 )),
720 },
721 Self::Fixed(sz, accum) => match lit {
722 AvroLiteral::Bytes(b) => {
723 if b.len() != *sz as usize {
724 return Err(AvroError::InvalidArgument(format!(
725 "Fixed default length {} does not match size {sz}",
726 b.len(),
727 )));
728 }
729 accum.extend_from_slice(b);
730 Ok(())
731 }
732 _ => Err(AvroError::InvalidArgument(
733 "Default for fixed must be bytes".to_string(),
734 )),
735 },
736 #[cfg(feature = "small_decimals")]
737 Self::Decimal32(_, _, _, builder) => {
738 append_decimal_default!(lit, builder, 4, i32, "decimal32")
739 }
740 #[cfg(feature = "small_decimals")]
741 Self::Decimal64(_, _, _, builder) => {
742 append_decimal_default!(lit, builder, 8, i64, "decimal64")
743 }
744 Self::Decimal128(_, _, _, builder) => {
745 append_decimal_default!(lit, builder, 16, i128, "decimal128")
746 }
747 Self::Decimal256(_, _, _, builder) => {
748 append_decimal_default!(lit, builder, 32, i256, "decimal256")
749 }
750 Self::Duration(builder) => match lit {
751 AvroLiteral::Bytes(b) => {
752 if b.len() != 12 {
753 return Err(AvroError::InvalidArgument(format!(
754 "Duration default must be exactly 12 bytes, got {}",
755 b.len()
756 )));
757 }
758 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
759 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
760 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
761 let nanos = (millis as i64) * 1_000_000;
762 builder.append_value(IntervalMonthDayNano::new(
763 months as i32,
764 days as i32,
765 nanos,
766 ));
767 Ok(())
768 }
769 _ => Err(AvroError::InvalidArgument(
770 "Default for duration must be 12-byte little-endian months/days/millis"
771 .to_string(),
772 )),
773 },
774 Self::Array(_, offsets, inner) => match lit {
775 AvroLiteral::Array(items) => {
776 offsets.push_length(items.len());
777 for item in items {
778 inner.append_default(item)?;
779 }
780 Ok(())
781 }
782 _ => Err(AvroError::InvalidArgument(
783 "Default for array must be an array literal".to_string(),
784 )),
785 },
786 Self::Map(_, koff, moff, kdata, valdec) => match lit {
787 AvroLiteral::Map(entries) => {
788 moff.push_length(entries.len());
789 for (k, v) in entries {
790 let kb = k.as_bytes();
791 koff.push_length(kb.len());
792 kdata.extend_from_slice(kb);
793 valdec.append_default(v)?;
794 }
795 Ok(())
796 }
797 _ => Err(AvroError::InvalidArgument(
798 "Default for map must be a map/object literal".to_string(),
799 )),
800 },
801 Self::Enum(indices, symbols, _) => match lit {
802 AvroLiteral::Enum(sym) => {
803 let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
804 AvroError::InvalidArgument(format!(
805 "Enum default symbol {sym:?} not in reader symbols"
806 ))
807 })?;
808 indices.push(pos as i32);
809 Ok(())
810 }
811 _ => Err(AvroError::InvalidArgument(
812 "Default for enum must be a symbol".to_string(),
813 )),
814 },
815 #[cfg(feature = "avro_custom_types")]
816 Self::RunEndEncoded(_, len, inner) => {
817 *len += 1;
818 inner.append_default(lit)
819 }
820 Self::Union(u) => u.append_default(lit),
821 Self::Record(field_meta, decoders, projector) => match lit {
822 AvroLiteral::Map(entries) => {
823 for (i, dec) in decoders.iter_mut().enumerate() {
824 let name = field_meta[i].name();
825 if let Some(sub) = entries.get(name) {
826 dec.append_default(sub)?;
827 } else if let Some(proj) = projector.as_ref() {
828 proj.project_default(dec, i)?;
829 } else {
830 dec.append_null()?;
831 }
832 }
833 Ok(())
834 }
835 AvroLiteral::Null => {
836 for (i, dec) in decoders.iter_mut().enumerate() {
837 if let Some(proj) = projector.as_ref() {
838 proj.project_default(dec, i)?;
839 } else {
840 dec.append_null()?;
841 }
842 }
843 Ok(())
844 }
845 _ => Err(AvroError::InvalidArgument(
846 "Default for record must be a map/object or null".to_string(),
847 )),
848 },
849 }
850 }
851
852 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
854 match self {
855 Self::Null(x) => *x += 1,
856 Self::Boolean(values) => values.append(buf.get_bool()?),
857 Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
858 values.push(buf.get_int()?)
859 }
860 Self::Int64(values)
861 | Self::TimeMicros(values)
862 | Self::TimestampMillis(_, values)
863 | Self::TimestampMicros(_, values)
864 | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
865 #[cfg(feature = "avro_custom_types")]
866 Self::DurationSecond(values)
867 | Self::DurationMillisecond(values)
868 | Self::DurationMicrosecond(values)
869 | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
870 Self::Float32(values) => values.push(buf.get_float()?),
871 Self::Float64(values) => values.push(buf.get_double()?),
872 Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
873 Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
874 Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
875 Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
876 Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
877 Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
878 Self::StringToBytes(offsets, values)
879 | Self::BytesToString(offsets, values)
880 | Self::Binary(offsets, values)
881 | Self::String(offsets, values)
882 | Self::StringView(offsets, values) => {
883 let data = buf.get_bytes()?;
884 offsets.push_length(data.len());
885 values.extend_from_slice(data);
886 }
887 Self::Uuid(values) => {
888 let s_bytes = buf.get_bytes()?;
889 let s = std::str::from_utf8(s_bytes).map_err(|e| {
890 AvroError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
891 })?;
892 let uuid = Uuid::try_parse(s)
893 .map_err(|e| AvroError::ParseError(format!("Failed to parse uuid: {e}")))?;
894 values.extend_from_slice(uuid.as_bytes());
895 }
896 Self::Array(_, off, encoding) => {
897 let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
898 off.push_length(total_items);
899 }
900 Self::Record(_, encodings, None) => {
901 for encoding in encodings {
902 encoding.decode(buf)?;
903 }
904 }
905 Self::Record(_, encodings, Some(proj)) => {
906 proj.project_record(buf, encodings)?;
907 }
908 Self::Map(_, koff, moff, kdata, valdec) => {
909 let newly_added = read_blocks(buf, |cur| {
910 let kb = cur.get_bytes()?;
911 koff.push_length(kb.len());
912 kdata.extend_from_slice(kb);
913 valdec.decode(cur)
914 })?;
915 moff.push_length(newly_added);
916 }
917 Self::Fixed(sz, accum) => {
918 let fx = buf.get_fixed(*sz as usize)?;
919 accum.extend_from_slice(fx);
920 }
921 #[cfg(feature = "small_decimals")]
922 Self::Decimal32(_, _, size, builder) => {
923 decode_decimal!(size, buf, builder, 4, i32);
924 }
925 #[cfg(feature = "small_decimals")]
926 Self::Decimal64(_, _, size, builder) => {
927 decode_decimal!(size, buf, builder, 8, i64);
928 }
929 Self::Decimal128(_, _, size, builder) => {
930 decode_decimal!(size, buf, builder, 16, i128);
931 }
932 Self::Decimal256(_, _, size, builder) => {
933 decode_decimal!(size, buf, builder, 32, i256);
934 }
935 Self::Enum(indices, _, None) => {
936 indices.push(buf.get_int()?);
937 }
938 Self::Enum(indices, _, Some(res)) => {
939 let raw = buf.get_int()?;
940 let resolved = usize::try_from(raw)
941 .ok()
942 .and_then(|idx| res.mapping.get(idx).copied())
943 .filter(|&idx| idx >= 0)
944 .unwrap_or(res.default_index);
945 if resolved >= 0 {
946 indices.push(resolved);
947 } else {
948 return Err(AvroError::ParseError(format!(
949 "Enum symbol index {raw} not resolvable and no default provided",
950 )));
951 }
952 }
953 Self::Duration(builder) => {
954 let b = buf.get_fixed(12)?;
955 let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
956 let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
957 let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
958 let nanos = (millis as i64) * 1_000_000;
959 builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
960 }
961 #[cfg(feature = "avro_custom_types")]
962 Self::RunEndEncoded(_, len, inner) => {
963 *len += 1;
964 inner.decode(buf)?;
965 }
966 Self::Union(u) => u.decode(buf)?,
967 Self::Nullable(order, nb, encoding, plan) => {
968 match *plan {
969 NullablePlan::FromSingle { promotion } => {
970 encoding.decode_with_promotion(buf, promotion)?;
971 nb.append(true);
972 }
973 NullablePlan::ReadTag => {
974 let branch = buf.read_vlq()?;
975 let is_not_null = match *order {
976 Nullability::NullFirst => branch != 0,
977 Nullability::NullSecond => branch == 0,
978 };
979 if is_not_null {
980 encoding.decode(buf)?;
982 } else {
983 encoding.append_null()?;
984 }
985 nb.append(is_not_null);
986 }
987 }
988 }
989 }
990 Ok(())
991 }
992
993 fn decode_with_promotion(
994 &mut self,
995 buf: &mut AvroCursor<'_>,
996 promotion: Promotion,
997 ) -> Result<(), AvroError> {
998 #[cfg(feature = "avro_custom_types")]
999 if let Self::RunEndEncoded(_, len, inner) = self {
1000 *len += 1;
1001 return inner.decode_with_promotion(buf, promotion);
1002 }
1003
1004 macro_rules! promote_numeric_to {
1005 ($variant:ident, $getter:ident, $to:ty) => {{
1006 match self {
1007 Self::$variant(v) => {
1008 let x = buf.$getter()?;
1009 v.push(x as $to);
1010 Ok(())
1011 }
1012 other => Err(AvroError::ParseError(format!(
1013 "Promotion {promotion} target mismatch: expected {}, got {}",
1014 stringify!($variant),
1015 <Self as ::std::convert::AsRef<str>>::as_ref(other)
1016 ))),
1017 }
1018 }};
1019 }
1020 match promotion {
1021 Promotion::Direct => self.decode(buf),
1022 Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1023 Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1024 Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1025 Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1026 Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1027 Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1028 Promotion::StringToBytes => match self {
1029 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1030 let data = buf.get_bytes()?;
1031 offsets.push_length(data.len());
1032 values.extend_from_slice(data);
1033 Ok(())
1034 }
1035 other => Err(AvroError::ParseError(format!(
1036 "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1037 <Self as AsRef<str>>::as_ref(other)
1038 ))),
1039 },
1040 Promotion::BytesToString => match self {
1041 Self::String(offsets, values)
1042 | Self::StringView(offsets, values)
1043 | Self::BytesToString(offsets, values) => {
1044 let data = buf.get_bytes()?;
1045 offsets.push_length(data.len());
1046 values.extend_from_slice(data);
1047 Ok(())
1048 }
1049 other => Err(AvroError::ParseError(format!(
1050 "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1051 <Self as AsRef<str>>::as_ref(other)
1052 ))),
1053 },
1054 }
1055 }
1056
1057 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1059 Ok(match self {
1060 Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1061 Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1062 Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1063 Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1064 Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1065 Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1066 Self::TimeMillis(values) => {
1067 Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1068 }
1069 Self::TimeMicros(values) => {
1070 Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1071 }
1072 Self::TimestampMillis(is_utc, values) => Arc::new(
1073 flush_primitive::<TimestampMillisecondType>(values, nulls)
1074 .with_timezone_opt(is_utc.then(|| "+00:00")),
1075 ),
1076 Self::TimestampMicros(is_utc, values) => Arc::new(
1077 flush_primitive::<TimestampMicrosecondType>(values, nulls)
1078 .with_timezone_opt(is_utc.then(|| "+00:00")),
1079 ),
1080 Self::TimestampNanos(is_utc, values) => Arc::new(
1081 flush_primitive::<TimestampNanosecondType>(values, nulls)
1082 .with_timezone_opt(is_utc.then(|| "+00:00")),
1083 ),
1084 #[cfg(feature = "avro_custom_types")]
1085 Self::DurationSecond(values) => {
1086 Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1087 }
1088 #[cfg(feature = "avro_custom_types")]
1089 Self::DurationMillisecond(values) => {
1090 Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1091 }
1092 #[cfg(feature = "avro_custom_types")]
1093 Self::DurationMicrosecond(values) => {
1094 Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1095 }
1096 #[cfg(feature = "avro_custom_types")]
1097 Self::DurationNanosecond(values) => {
1098 Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1099 }
1100 Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1101 Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1102 Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1103 Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1104 Arc::new(flush_primitive::<Float32Type>(values, nulls))
1105 }
1106 Self::Int32ToFloat64(values)
1107 | Self::Int64ToFloat64(values)
1108 | Self::Float32ToFloat64(values) => {
1109 Arc::new(flush_primitive::<Float64Type>(values, nulls))
1110 }
1111 Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1112 let offsets = flush_offsets(offsets);
1113 let values = flush_values(values).into();
1114 Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1115 }
1116 Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1117 let offsets = flush_offsets(offsets);
1118 let values = flush_values(values).into();
1119 Arc::new(StringArray::try_new(offsets, values, nulls)?)
1120 }
1121 Self::StringView(offsets, values) => {
1122 let offsets = flush_offsets(offsets);
1123 let values = flush_values(values);
1124 let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1125 let values: Vec<&str> = (0..array.len())
1126 .map(|i| {
1127 if array.is_valid(i) {
1128 array.value(i)
1129 } else {
1130 ""
1131 }
1132 })
1133 .collect();
1134 Arc::new(StringViewArray::from(values))
1135 }
1136 Self::Array(field, offsets, values) => {
1137 let values = values.flush(None)?;
1138 let offsets = flush_offsets(offsets);
1139 Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1140 }
1141 Self::Record(fields, encodings, _) => {
1142 let arrays = encodings
1143 .iter_mut()
1144 .map(|x| x.flush(None))
1145 .collect::<Result<Vec<_>, _>>()?;
1146 Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1147 }
1148 Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1149 let moff = flush_offsets(m_off);
1150 let koff = flush_offsets(k_off);
1151 let kd = flush_values(kdata).into();
1152 let val_arr = valdec.flush(None)?;
1153 let key_arr = StringArray::try_new(koff, kd, None)?;
1154 if key_arr.len() != val_arr.len() {
1155 return Err(AvroError::InvalidArgument(format!(
1156 "Map keys length ({}) != map values length ({})",
1157 key_arr.len(),
1158 val_arr.len()
1159 )));
1160 }
1161 let final_len = moff.len() - 1;
1162 if let Some(n) = &nulls {
1163 if n.len() != final_len {
1164 return Err(AvroError::InvalidArgument(format!(
1165 "Map array null buffer length {} != final map length {final_len}",
1166 n.len()
1167 )));
1168 }
1169 }
1170 let entries_fields = match map_field.data_type() {
1171 DataType::Struct(fields) => fields.clone(),
1172 other => {
1173 return Err(AvroError::InvalidArgument(format!(
1174 "Map entries field must be a Struct, got {other:?}"
1175 )));
1176 }
1177 };
1178 let entries_struct =
1179 StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1180 let map_arr =
1181 MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1182 Arc::new(map_arr)
1183 }
1184 Self::Fixed(sz, accum) => {
1185 let b: Buffer = flush_values(accum).into();
1186 let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1187 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1188 Arc::new(arr)
1189 }
1190 Self::Uuid(values) => {
1191 let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1192 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1193 Arc::new(arr)
1194 }
1195 #[cfg(feature = "small_decimals")]
1196 Self::Decimal32(precision, scale, _, builder) => {
1197 flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1198 }
1199 #[cfg(feature = "small_decimals")]
1200 Self::Decimal64(precision, scale, _, builder) => {
1201 flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1202 }
1203 Self::Decimal128(precision, scale, _, builder) => {
1204 flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1205 }
1206 Self::Decimal256(precision, scale, _, builder) => {
1207 flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1208 }
1209 Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1210 Self::Duration(builder) => {
1211 let (_, vals, _) = builder.finish().into_parts();
1212 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1213 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1214 Arc::new(vals)
1215 }
1216 #[cfg(feature = "avro_custom_types")]
1217 Self::RunEndEncoded(width, len, inner) => {
1218 let values = inner.flush(nulls)?;
1219 let n = *len;
1220 let arr = values.as_ref();
1221 let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1222 if n > 0 {
1223 run_starts.push(0);
1224 for i in 1..n {
1225 if !values_equal_at(arr, i - 1, i) {
1226 run_starts.push(i);
1227 }
1228 }
1229 }
1230 if n > (u32::MAX as usize) {
1231 return Err(AvroError::InvalidArgument(format!(
1232 "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1233 )));
1234 }
1235 let run_count = run_starts.len();
1236 let take_idx: PrimitiveArray<UInt32Type> =
1237 run_starts.iter().map(|&s| s as u32).collect();
1238 let per_run_values = if run_count == 0 {
1239 values.slice(0, 0)
1240 } else {
1241 take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1242 AvroError::ParseError(format!("take() for REE values failed: {e}"))
1243 })?
1244 };
1245
1246 macro_rules! build_run_array {
1247 ($Native:ty, $ArrowTy:ty) => {{
1248 let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1249 for (idx, &_start) in run_starts.iter().enumerate() {
1250 let end = if idx + 1 < run_count {
1251 run_starts[idx + 1]
1252 } else {
1253 n
1254 };
1255 ends.push(end as $Native);
1256 }
1257 let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1258 let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1259 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1260 Arc::new(run_arr) as ArrayRef
1261 }};
1262 }
1263 match *width {
1264 2 => {
1265 if n > i16::MAX as usize {
1266 return Err(AvroError::InvalidArgument(format!(
1267 "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1268 )));
1269 }
1270 build_run_array!(i16, Int16Type)
1271 }
1272 4 => build_run_array!(i32, Int32Type),
1273 8 => build_run_array!(i64, Int64Type),
1274 other => {
1275 return Err(AvroError::InvalidArgument(format!(
1276 "Unsupported run-end width {other} for RunEndEncoded"
1277 )));
1278 }
1279 }
1280 }
1281 Self::Union(u) => u.flush(nulls)?,
1282 })
1283 }
1284}
1285
1286#[derive(Debug)]
1288struct DispatchLookupTable {
1289 to_reader: Box<[i8]>,
1305 promotion: Box<[Promotion]>,
1310}
1311
1312const NO_SOURCE: i8 = -1;
1315
1316impl DispatchLookupTable {
1317 fn from_writer_to_reader(
1318 promotion_map: &[Option<(usize, Promotion)>],
1319 ) -> Result<Self, AvroError> {
1320 let mut to_reader = Vec::with_capacity(promotion_map.len());
1321 let mut promotion = Vec::with_capacity(promotion_map.len());
1322 for map in promotion_map {
1323 match *map {
1324 Some((idx, promo)) => {
1325 let idx_i8 = i8::try_from(idx).map_err(|_| {
1326 AvroError::SchemaError(format!(
1327 "Reader branch index {idx} exceeds i8 range (max {})",
1328 i8::MAX
1329 ))
1330 })?;
1331 to_reader.push(idx_i8);
1332 promotion.push(promo);
1333 }
1334 None => {
1335 to_reader.push(NO_SOURCE);
1336 promotion.push(Promotion::Direct);
1337 }
1338 }
1339 }
1340 Ok(Self {
1341 to_reader: to_reader.into_boxed_slice(),
1342 promotion: promotion.into_boxed_slice(),
1343 })
1344 }
1345
1346 #[inline]
1348 fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1349 let reader_index = *self.to_reader.get(writer_index)?;
1350 (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1351 }
1352}
1353
1354#[derive(Debug)]
1355struct UnionDecoder {
1356 fields: UnionFields,
1357 type_ids: Vec<i8>,
1358 offsets: Vec<i32>,
1359 branches: Vec<Decoder>,
1360 counts: Vec<i32>,
1361 reader_type_codes: Vec<i8>,
1362 default_emit_idx: usize,
1363 null_emit_idx: usize,
1364 plan: UnionReadPlan,
1365}
1366
1367impl Default for UnionDecoder {
1368 fn default() -> Self {
1369 Self {
1370 fields: UnionFields::empty(),
1371 type_ids: Vec::new(),
1372 offsets: Vec::new(),
1373 branches: Vec::new(),
1374 counts: Vec::new(),
1375 reader_type_codes: Vec::new(),
1376 default_emit_idx: 0,
1377 null_emit_idx: 0,
1378 plan: UnionReadPlan::Passthrough,
1379 }
1380 }
1381}
1382
1383#[derive(Debug)]
1384enum UnionReadPlan {
1385 ReaderUnion {
1386 lookup_table: DispatchLookupTable,
1387 },
1388 FromSingle {
1389 reader_idx: usize,
1390 promotion: Promotion,
1391 },
1392 ToSingle {
1393 target: Box<Decoder>,
1394 lookup_table: DispatchLookupTable,
1395 },
1396 Passthrough,
1397}
1398
1399impl UnionDecoder {
1400 fn try_new(
1401 fields: UnionFields,
1402 branches: Vec<Decoder>,
1403 resolved: Option<ResolvedUnion>,
1404 ) -> Result<Self, AvroError> {
1405 let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1406 let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1407 let default_emit_idx = 0;
1408 let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1409 let branch_len = branches.len().max(reader_type_codes.len());
1410 let max_addr = (i32::MAX as usize) + 1;
1412 if branches.len() > max_addr {
1413 return Err(AvroError::SchemaError(format!(
1414 "Reader union has {} branches, which exceeds the maximum addressable \
1415 branches by an Avro int tag ({} + 1).",
1416 branches.len(),
1417 i32::MAX
1418 )));
1419 }
1420 Ok(Self {
1421 fields,
1422 type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1423 offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1424 branches,
1425 counts: vec![0; branch_len],
1426 reader_type_codes,
1427 default_emit_idx,
1428 null_emit_idx,
1429 plan: Self::plan_from_resolved(resolved)?,
1430 })
1431 }
1432
1433 fn try_new_from_writer_union(
1434 info: ResolvedUnion,
1435 target: Box<Decoder>,
1436 ) -> Result<Self, AvroError> {
1437 debug_assert!(info.writer_is_union && !info.reader_is_union);
1439 let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1440 Ok(Self {
1441 plan: UnionReadPlan::ToSingle {
1442 target,
1443 lookup_table,
1444 },
1445 ..Self::default()
1446 })
1447 }
1448
1449 fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, AvroError> {
1450 let Some(info) = resolved else {
1451 return Ok(UnionReadPlan::Passthrough);
1452 };
1453 match (info.writer_is_union, info.reader_is_union) {
1454 (true, true) => {
1455 let lookup_table =
1456 DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1457 Ok(UnionReadPlan::ReaderUnion { lookup_table })
1458 }
1459 (false, true) => {
1460 let Some(&(reader_idx, promotion)) =
1461 info.writer_to_reader.first().and_then(Option::as_ref)
1462 else {
1463 return Err(AvroError::SchemaError(
1464 "Writer type does not match any reader union branch".to_string(),
1465 ));
1466 };
1467 Ok(UnionReadPlan::FromSingle {
1468 reader_idx,
1469 promotion,
1470 })
1471 }
1472 (true, false) => Err(AvroError::InvalidArgument(
1473 "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1474 .to_string(),
1475 )),
1476 _ => Err(AvroError::SchemaError(
1478 "ResolvedUnion constructed for non-union sides; resolver should return None"
1479 .to_string(),
1480 )),
1481 }
1482 }
1483
1484 #[inline]
1485 fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
1486 let raw = buf.get_long()?;
1491 if raw < 0 {
1492 return Err(AvroError::ParseError(format!(
1493 "Negative union branch index {raw}"
1494 )));
1495 }
1496 usize::try_from(raw).map_err(|_| {
1497 AvroError::ParseError(format!(
1498 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1499 (usize::BITS as usize)
1500 ))
1501 })
1502 }
1503
1504 #[inline]
1505 fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, AvroError> {
1506 let branches_len = self.branches.len();
1507 let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1508 return Err(AvroError::ParseError(format!(
1509 "Union branch index {reader_idx} out of range ({branches_len} branches)"
1510 )));
1511 };
1512 self.type_ids.push(self.reader_type_codes[reader_idx]);
1513 self.offsets.push(self.counts[reader_idx]);
1514 self.counts[reader_idx] += 1;
1515 Ok(reader_branch)
1516 }
1517
1518 #[inline]
1519 fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), AvroError>
1520 where
1521 F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
1522 {
1523 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1524 return action(target);
1525 }
1526 let reader_idx = match &self.plan {
1527 UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1528 _ => fallback_idx,
1529 };
1530 self.emit_to(reader_idx).and_then(action)
1531 }
1532
1533 fn append_null(&mut self) -> Result<(), AvroError> {
1534 self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1535 }
1536
1537 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
1538 self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1539 }
1540
1541 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
1542 let (reader_idx, promotion) = match &mut self.plan {
1543 UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1544 UnionReadPlan::ReaderUnion { lookup_table } => {
1545 let idx = Self::read_tag(buf)?;
1546 lookup_table.resolve(idx).ok_or_else(|| {
1547 AvroError::ParseError(format!(
1548 "Union branch index {idx} not resolvable by reader schema"
1549 ))
1550 })?
1551 }
1552 UnionReadPlan::FromSingle {
1553 reader_idx,
1554 promotion,
1555 } => (*reader_idx, *promotion),
1556 UnionReadPlan::ToSingle {
1557 target,
1558 lookup_table,
1559 } => {
1560 let idx = Self::read_tag(buf)?;
1561 return match lookup_table.resolve(idx) {
1562 Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1563 None => Err(AvroError::ParseError(format!(
1564 "Writer union branch {idx} does not resolve to reader type"
1565 ))),
1566 };
1567 }
1568 };
1569 let decoder = self.emit_to(reader_idx)?;
1570 decoder.decode_with_promotion(buf, promotion)
1571 }
1572
1573 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, AvroError> {
1574 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1575 return target.flush(nulls);
1576 }
1577 debug_assert!(
1578 nulls.is_none(),
1579 "UnionArray does not accept a validity bitmap; \
1580 nulls should have been materialized as a Null child during decode"
1581 );
1582 let children = self
1583 .branches
1584 .iter_mut()
1585 .map(|d| d.flush(None))
1586 .collect::<Result<Vec<_>, _>>()?;
1587 let arr = UnionArray::try_new(
1588 self.fields.clone(),
1589 flush_values(&mut self.type_ids).into_iter().collect(),
1590 Some(flush_values(&mut self.offsets).into_iter().collect()),
1591 children,
1592 )
1593 .map_err(|e| AvroError::ParseError(e.to_string()))?;
1594 Ok(Arc::new(arr))
1595 }
1596}
1597
1598#[derive(Debug, Default)]
1599struct UnionDecoderBuilder {
1600 fields: Option<UnionFields>,
1601 branches: Option<Vec<Decoder>>,
1602 resolved: Option<ResolvedUnion>,
1603 target: Option<Box<Decoder>>,
1604}
1605
1606impl UnionDecoderBuilder {
1607 fn new() -> Self {
1608 Self::default()
1609 }
1610
1611 fn with_fields(mut self, fields: UnionFields) -> Self {
1612 self.fields = Some(fields);
1613 self
1614 }
1615
1616 fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
1617 self.branches = Some(branches);
1618 self
1619 }
1620
1621 fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
1622 self.resolved = Some(resolved_union);
1623 self
1624 }
1625
1626 fn with_target(mut self, target: Box<Decoder>) -> Self {
1627 self.target = Some(target);
1628 self
1629 }
1630
1631 fn build(self) -> Result<UnionDecoder, AvroError> {
1632 match (self.resolved, self.fields, self.branches, self.target) {
1633 (resolved, Some(fields), Some(branches), None) => {
1634 UnionDecoder::try_new(fields, branches, resolved)
1635 }
1636 (Some(info), None, None, Some(target))
1637 if info.writer_is_union && !info.reader_is_union =>
1638 {
1639 UnionDecoder::try_new_from_writer_union(info, target)
1640 }
1641 _ => Err(AvroError::InvalidArgument(
1642 "Invalid UnionDecoderBuilder configuration: expected either \
1643 (fields + branches + resolved) with no target for reader-unions, or \
1644 (resolved + target) with no fields/branches for writer-union to single."
1645 .to_string(),
1646 )),
1647 }
1648 }
1649}
1650
1651#[derive(Debug, Copy, Clone)]
1652enum NegativeBlockBehavior {
1653 ProcessItems,
1654 SkipBySize,
1655}
1656
1657#[inline]
1658fn skip_blocks(
1659 buf: &mut AvroCursor,
1660 mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
1661) -> Result<usize, AvroError> {
1662 process_blockwise(
1663 buf,
1664 move |c| skip_item(c),
1665 NegativeBlockBehavior::SkipBySize,
1666 )
1667}
1668
1669#[inline]
1670fn flush_dict(
1671 indices: &mut Vec<i32>,
1672 symbols: &[String],
1673 nulls: Option<NullBuffer>,
1674) -> Result<ArrayRef, AvroError> {
1675 let keys = flush_primitive::<Int32Type>(indices, nulls);
1676 let values = Arc::new(StringArray::from_iter_values(
1677 symbols.iter().map(|s| s.as_str()),
1678 ));
1679 DictionaryArray::try_new(keys, values)
1680 .map_err(Into::into)
1681 .map(|arr| Arc::new(arr) as ArrayRef)
1682}
1683
1684#[inline]
1685fn read_blocks(
1686 buf: &mut AvroCursor,
1687 decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
1688) -> Result<usize, AvroError> {
1689 process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
1690}
1691
1692#[inline]
1693fn process_blockwise(
1694 buf: &mut AvroCursor,
1695 mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
1696 negative_behavior: NegativeBlockBehavior,
1697) -> Result<usize, AvroError> {
1698 let mut total = 0usize;
1699 loop {
1700 let block_count = buf.get_long()?;
1705 match block_count.cmp(&0) {
1706 Ordering::Equal => break,
1707 Ordering::Less => {
1708 let count = (-block_count) as usize;
1709 let size_in_bytes = buf.get_long()? as usize;
1711 match negative_behavior {
1712 NegativeBlockBehavior::ProcessItems => {
1713 for _ in 0..count {
1715 on_item(buf)?;
1716 }
1717 }
1718 NegativeBlockBehavior::SkipBySize => {
1719 let _ = buf.get_fixed(size_in_bytes)?;
1721 }
1722 }
1723 total += count;
1724 }
1725 Ordering::Greater => {
1726 let count = block_count as usize;
1727 for _ in 0..count {
1728 on_item(buf)?;
1729 }
1730 total += count;
1731 }
1732 }
1733 }
1734 Ok(total)
1735}
1736
1737#[inline]
1738fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
1739 std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
1740}
1741
1742#[inline]
1743fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
1744 std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
1745}
1746
1747#[inline]
1748fn flush_primitive<T: ArrowPrimitiveType>(
1749 values: &mut Vec<T::Native>,
1750 nulls: Option<NullBuffer>,
1751) -> PrimitiveArray<T> {
1752 PrimitiveArray::new(flush_values(values).into(), nulls)
1753}
1754
1755#[inline]
1756fn read_decimal_bytes_be<const N: usize>(
1757 buf: &mut AvroCursor<'_>,
1758 size: &Option<usize>,
1759) -> Result<[u8; N], AvroError> {
1760 match size {
1761 Some(n) if *n == N => {
1762 let raw = buf.get_fixed(N)?;
1763 let mut arr = [0u8; N];
1764 arr.copy_from_slice(raw);
1765 Ok(arr)
1766 }
1767 Some(n) => {
1768 let raw = buf.get_fixed(*n)?;
1769 sign_cast_to::<N>(raw)
1770 }
1771 None => {
1772 let raw = buf.get_bytes()?;
1773 sign_cast_to::<N>(raw)
1774 }
1775 }
1776}
1777
1778#[inline]
1787fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
1788 let len = raw.len();
1789 if len == N {
1791 let mut out = [0u8; N];
1792 out.copy_from_slice(raw);
1793 return Ok(out);
1794 }
1795 let first = raw.first().copied().unwrap_or(0u8);
1797 let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
1798 let mut out = [sign_byte; N];
1800 if len > N {
1801 let extra = len - N;
1804 if raw[..extra].iter().any(|&b| b != sign_byte) {
1806 return Err(AvroError::ParseError(format!(
1807 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1808 len, N
1809 )));
1810 }
1811 if N > 0 {
1812 let first_kept = raw[extra];
1813 let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
1814 if sign_bit_mismatch {
1815 return Err(AvroError::ParseError(format!(
1816 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1817 len, N
1818 )));
1819 }
1820 }
1821 out.copy_from_slice(&raw[extra..]);
1822 return Ok(out);
1823 }
1824 out[N - len..].copy_from_slice(raw);
1825 Ok(out)
1826}
1827
1828#[cfg(feature = "avro_custom_types")]
1829#[inline]
1830fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
1831 match (arr.is_null(i), arr.is_null(j)) {
1832 (true, true) => true,
1833 (true, false) | (false, true) => false,
1834 (false, false) => {
1835 let a = arr.slice(i, 1);
1836 let b = arr.slice(j, 1);
1837 a == b
1838 }
1839 }
1840}
1841
1842#[derive(Debug)]
1843struct Projector {
1844 writer_to_reader: Arc<[Option<usize>]>,
1845 skip_decoders: Vec<Option<Skipper>>,
1846 field_defaults: Vec<Option<AvroLiteral>>,
1847 default_injections: Arc<[(usize, AvroLiteral)]>,
1848}
1849
1850#[derive(Debug)]
1851struct ProjectorBuilder<'a> {
1852 rec: &'a ResolvedRecord,
1853 reader_fields: Arc<[AvroField]>,
1854}
1855
1856impl<'a> ProjectorBuilder<'a> {
1857 #[inline]
1858 fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
1859 Self {
1860 rec,
1861 reader_fields: reader_fields.clone(),
1862 }
1863 }
1864
1865 #[inline]
1866 fn build(self) -> Result<Projector, AvroError> {
1867 let reader_fields = self.reader_fields;
1868 let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
1869 for avro_field in reader_fields.as_ref() {
1870 if let Some(ResolutionInfo::DefaultValue(lit)) =
1871 avro_field.data_type().resolution.as_ref()
1872 {
1873 field_defaults.push(Some(lit.clone()));
1874 } else {
1875 field_defaults.push(None);
1876 }
1877 }
1878 let mut default_injections: Vec<(usize, AvroLiteral)> =
1879 Vec::with_capacity(self.rec.default_fields.len());
1880 for &idx in self.rec.default_fields.as_ref() {
1881 let lit = field_defaults
1882 .get(idx)
1883 .and_then(|lit| lit.clone())
1884 .unwrap_or(AvroLiteral::Null);
1885 default_injections.push((idx, lit));
1886 }
1887 let mut skip_decoders: Vec<Option<Skipper>> =
1888 Vec::with_capacity(self.rec.skip_fields.len());
1889 for datatype in self.rec.skip_fields.as_ref() {
1890 let skipper = match datatype {
1891 Some(datatype) => Some(Skipper::from_avro(datatype)?),
1892 None => None,
1893 };
1894 skip_decoders.push(skipper);
1895 }
1896 Ok(Projector {
1897 writer_to_reader: self.rec.writer_to_reader.clone(),
1898 skip_decoders,
1899 field_defaults,
1900 default_injections: default_injections.into(),
1901 })
1902 }
1903}
1904
1905impl Projector {
1906 #[inline]
1907 fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), AvroError> {
1908 if let Some(default_literal) = self.field_defaults[index].as_ref() {
1915 decoder.append_default(default_literal)
1916 } else {
1917 decoder.append_null()
1918 }
1919 }
1920
1921 #[inline]
1922 fn project_record(
1923 &mut self,
1924 buf: &mut AvroCursor<'_>,
1925 encodings: &mut [Decoder],
1926 ) -> Result<(), AvroError> {
1927 debug_assert_eq!(
1928 self.writer_to_reader.len(),
1929 self.skip_decoders.len(),
1930 "internal invariant: mapping and skipper lists must have equal length"
1931 );
1932 for (i, (mapping, skipper_opt)) in self
1933 .writer_to_reader
1934 .iter()
1935 .zip(self.skip_decoders.iter_mut())
1936 .enumerate()
1937 {
1938 match (mapping, skipper_opt.as_mut()) {
1939 (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
1940 (None, Some(skipper)) => skipper.skip(buf)?,
1941 (None, None) => {
1942 return Err(AvroError::SchemaError(format!(
1943 "No skipper available for writer-only field at index {i}",
1944 )));
1945 }
1946 }
1947 }
1948 for (reader_index, lit) in self.default_injections.as_ref() {
1949 encodings[*reader_index].append_default(lit)?;
1950 }
1951 Ok(())
1952 }
1953}
1954
1955#[derive(Debug)]
1961enum Skipper {
1962 Null,
1963 Boolean,
1964 Int32,
1965 Int64,
1966 Float32,
1967 Float64,
1968 Bytes,
1969 String,
1970 TimeMicros,
1971 TimestampMillis,
1972 TimestampMicros,
1973 TimestampNanos,
1974 Fixed(usize),
1975 Decimal(Option<usize>),
1976 UuidString,
1977 Enum,
1978 DurationFixed12,
1979 List(Box<Skipper>),
1980 Map(Box<Skipper>),
1981 Struct(Vec<Skipper>),
1982 Union(Vec<Skipper>),
1983 Nullable(Nullability, Box<Skipper>),
1984 #[cfg(feature = "avro_custom_types")]
1985 RunEndEncoded(Box<Skipper>),
1986}
1987
1988impl Skipper {
1989 fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
1990 let mut base = match dt.codec() {
1991 Codec::Null => Self::Null,
1992 Codec::Boolean => Self::Boolean,
1993 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1994 Codec::Int64 => Self::Int64,
1995 Codec::TimeMicros => Self::TimeMicros,
1996 Codec::TimestampMillis(_) => Self::TimestampMillis,
1997 Codec::TimestampMicros(_) => Self::TimestampMicros,
1998 Codec::TimestampNanos(_) => Self::TimestampNanos,
1999 #[cfg(feature = "avro_custom_types")]
2000 Codec::DurationNanos
2001 | Codec::DurationMicros
2002 | Codec::DurationMillis
2003 | Codec::DurationSeconds => Self::Int64,
2004 Codec::Float32 => Self::Float32,
2005 Codec::Float64 => Self::Float64,
2006 Codec::Binary => Self::Bytes,
2007 Codec::Utf8 | Codec::Utf8View => Self::String,
2008 Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2009 Codec::Decimal(_, _, size) => Self::Decimal(*size),
2010 Codec::Uuid => Self::UuidString, Codec::Enum(_) => Self::Enum,
2012 Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2013 Codec::Struct(fields) => Self::Struct(
2014 fields
2015 .iter()
2016 .map(|f| Skipper::from_avro(f.data_type()))
2017 .collect::<Result<_, _>>()?,
2018 ),
2019 Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2020 Codec::Interval => Self::DurationFixed12,
2021 Codec::Union(encodings, _, _) => {
2022 let max_addr = (i32::MAX as usize) + 1;
2023 if encodings.len() > max_addr {
2024 return Err(AvroError::SchemaError(format!(
2025 "Writer union has {} branches, which exceeds the maximum addressable \
2026 branches by an Avro int tag ({} + 1).",
2027 encodings.len(),
2028 i32::MAX
2029 )));
2030 }
2031 Self::Union(
2032 encodings
2033 .iter()
2034 .map(Skipper::from_avro)
2035 .collect::<Result<_, _>>()?,
2036 )
2037 }
2038 #[cfg(feature = "avro_custom_types")]
2039 Codec::RunEndEncoded(inner, _w) => {
2040 Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2041 }
2042 };
2043 if let Some(n) = dt.nullability() {
2044 base = Self::Nullable(n, Box::new(base));
2045 }
2046 Ok(base)
2047 }
2048
2049 fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
2050 match self {
2051 Self::Null => Ok(()),
2052 Self::Boolean => {
2053 buf.get_bool()?;
2054 Ok(())
2055 }
2056 Self::Int32 => {
2057 buf.get_int()?;
2058 Ok(())
2059 }
2060 Self::Int64
2061 | Self::TimeMicros
2062 | Self::TimestampMillis
2063 | Self::TimestampMicros
2064 | Self::TimestampNanos => {
2065 buf.get_long()?;
2066 Ok(())
2067 }
2068 Self::Float32 => {
2069 buf.get_float()?;
2070 Ok(())
2071 }
2072 Self::Float64 => {
2073 buf.get_double()?;
2074 Ok(())
2075 }
2076 Self::Bytes | Self::String | Self::UuidString => {
2077 buf.get_bytes()?;
2078 Ok(())
2079 }
2080 Self::Fixed(sz) => {
2081 buf.get_fixed(*sz)?;
2082 Ok(())
2083 }
2084 Self::Decimal(size) => {
2085 if let Some(s) = size {
2086 buf.get_fixed(*s)
2087 } else {
2088 buf.get_bytes()
2089 }?;
2090 Ok(())
2091 }
2092 Self::Enum => {
2093 buf.get_int()?;
2094 Ok(())
2095 }
2096 Self::DurationFixed12 => {
2097 buf.get_fixed(12)?;
2098 Ok(())
2099 }
2100 Self::List(item) => {
2101 skip_blocks(buf, |c| item.skip(c))?;
2102 Ok(())
2103 }
2104 Self::Map(value) => {
2105 skip_blocks(buf, |c| {
2106 c.get_bytes()?; value.skip(c)
2108 })?;
2109 Ok(())
2110 }
2111 Self::Struct(fields) => {
2112 for f in fields.iter_mut() {
2113 f.skip(buf)?
2114 }
2115 Ok(())
2116 }
2117 Self::Union(encodings) => {
2118 let raw = buf.get_long()?;
2120 if raw < 0 {
2121 return Err(AvroError::ParseError(format!(
2122 "Negative union branch index {raw}"
2123 )));
2124 }
2125 let idx: usize = usize::try_from(raw).map_err(|_| {
2126 AvroError::ParseError(format!(
2127 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2128 (usize::BITS as usize)
2129 ))
2130 })?;
2131 let Some(encoding) = encodings.get_mut(idx) else {
2132 return Err(AvroError::ParseError(format!(
2133 "Union branch index {idx} out of range for skipper ({} branches)",
2134 encodings.len()
2135 )));
2136 };
2137 encoding.skip(buf)
2138 }
2139 Self::Nullable(order, inner) => {
2140 let branch = buf.read_vlq()?;
2141 let is_not_null = match *order {
2142 Nullability::NullFirst => branch != 0,
2143 Nullability::NullSecond => branch == 0,
2144 };
2145 if is_not_null {
2146 inner.skip(buf)?;
2147 }
2148 Ok(())
2149 }
2150 #[cfg(feature = "avro_custom_types")]
2151 Self::RunEndEncoded(inner) => inner.skip(buf),
2152 }
2153 }
2154}
2155
2156#[cfg(test)]
2157mod tests {
2158 use super::*;
2159 use crate::codec::AvroFieldBuilder;
2160 use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2161 use arrow_array::cast::AsArray;
2162 use indexmap::IndexMap;
2163 use std::collections::HashMap;
2164
2165 fn encode_avro_int(value: i32) -> Vec<u8> {
2166 let mut buf = Vec::new();
2167 let mut v = (value << 1) ^ (value >> 31);
2168 while v & !0x7F != 0 {
2169 buf.push(((v & 0x7F) | 0x80) as u8);
2170 v >>= 7;
2171 }
2172 buf.push(v as u8);
2173 buf
2174 }
2175
2176 fn encode_avro_long(value: i64) -> Vec<u8> {
2177 let mut buf = Vec::new();
2178 let mut v = (value << 1) ^ (value >> 63);
2179 while v & !0x7F != 0 {
2180 buf.push(((v & 0x7F) | 0x80) as u8);
2181 v >>= 7;
2182 }
2183 buf.push(v as u8);
2184 buf
2185 }
2186
2187 fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2188 let mut buf = encode_avro_long(bytes.len() as i64);
2189 buf.extend_from_slice(bytes);
2190 buf
2191 }
2192
2193 fn avro_from_codec(codec: Codec) -> AvroDataType {
2194 AvroDataType::new(codec, Default::default(), None)
2195 }
2196
2197 fn resolved_root_datatype(
2198 writer: Schema<'static>,
2199 reader: Schema<'static>,
2200 use_utf8view: bool,
2201 strict_mode: bool,
2202 ) -> AvroDataType {
2203 let writer_record = Schema::Complex(ComplexType::Record(Record {
2205 name: "Root",
2206 namespace: None,
2207 doc: None,
2208 aliases: vec![],
2209 fields: vec![Field {
2210 name: "v",
2211 r#type: writer,
2212 default: None,
2213 doc: None,
2214 aliases: vec![],
2215 }],
2216 attributes: Attributes::default(),
2217 }));
2218
2219 let reader_record = Schema::Complex(ComplexType::Record(Record {
2221 name: "Root",
2222 namespace: None,
2223 doc: None,
2224 aliases: vec![],
2225 fields: vec![Field {
2226 name: "v",
2227 r#type: reader,
2228 default: None,
2229 doc: None,
2230 aliases: vec![],
2231 }],
2232 attributes: Attributes::default(),
2233 }));
2234
2235 let field = AvroFieldBuilder::new(&writer_record)
2237 .with_reader_schema(&reader_record)
2238 .with_utf8view(use_utf8view)
2239 .with_strict_mode(strict_mode)
2240 .build()
2241 .expect("schema resolution should succeed");
2242
2243 match field.data_type().codec() {
2244 Codec::Struct(fields) => fields[0].data_type().clone(),
2245 other => panic!("expected wrapper struct, got {other:?}"),
2246 }
2247 }
2248
2249 fn decoder_for_promotion(
2250 writer: PrimitiveType,
2251 reader: PrimitiveType,
2252 use_utf8view: bool,
2253 ) -> Decoder {
2254 let ws = Schema::TypeName(TypeName::Primitive(writer));
2255 let rs = Schema::TypeName(TypeName::Primitive(reader));
2256 let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2257 Decoder::try_new(&dt).unwrap()
2258 }
2259
2260 fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2261 AvroDataType::new(codec, HashMap::new(), nullability)
2262 }
2263
2264 #[cfg(feature = "avro_custom_types")]
2265 fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2266 let mut out = Vec::with_capacity(10);
2267 while x >= 0x80 {
2268 out.push((x as u8) | 0x80);
2269 x >>= 7;
2270 }
2271 out.push(x as u8);
2272 out
2273 }
2274
2275 #[test]
2276 fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2277 let ws = Schema::Union(vec![
2278 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2279 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2280 ]);
2281 let rs = Schema::Union(vec![
2282 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2283 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2284 ]);
2285
2286 let dt = resolved_root_datatype(ws, rs, false, false);
2287 let mut dec = Decoder::try_new(&dt).unwrap();
2288
2289 let mut rec1 = encode_avro_long(0);
2290 rec1.extend(encode_avro_int(7));
2291 let mut cur1 = AvroCursor::new(&rec1);
2292 dec.decode(&mut cur1).unwrap();
2293
2294 let mut rec2 = encode_avro_long(1);
2295 rec2.extend(encode_avro_bytes("abc".as_bytes()));
2296 let mut cur2 = AvroCursor::new(&rec2);
2297 dec.decode(&mut cur2).unwrap();
2298
2299 let arr = dec.flush(None).unwrap();
2300 let ua = arr
2301 .as_any()
2302 .downcast_ref::<UnionArray>()
2303 .expect("dense union output");
2304
2305 assert_eq!(
2306 ua.type_id(0),
2307 1,
2308 "first value must select reader 'long' branch"
2309 );
2310 assert_eq!(ua.value_offset(0), 0);
2311
2312 assert_eq!(
2313 ua.type_id(1),
2314 0,
2315 "second value must select reader 'string' branch"
2316 );
2317 assert_eq!(ua.value_offset(1), 0);
2318
2319 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2320 assert_eq!(long_child.len(), 1);
2321 assert_eq!(long_child.value(0), 7);
2322
2323 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2324 assert_eq!(str_child.len(), 1);
2325 assert_eq!(str_child.value(0), "abc");
2326 }
2327
2328 #[test]
2329 fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2330 let ws = Schema::Union(vec![
2331 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2332 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2333 ]);
2334 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2335
2336 let dt = resolved_root_datatype(ws, rs, false, false);
2337 let mut dec = Decoder::try_new(&dt).unwrap();
2338
2339 let mut data = encode_avro_long(0);
2340 data.extend(encode_avro_int(5));
2341 let mut cur = AvroCursor::new(&data);
2342 dec.decode(&mut cur).unwrap();
2343
2344 let arr = dec.flush(None).unwrap();
2345 let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2346 assert_eq!(out.len(), 1);
2347 assert_eq!(out.value(0), 5);
2348 }
2349
2350 #[test]
2351 fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2352 let ws = Schema::Union(vec![
2353 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2354 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2355 ]);
2356 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2357
2358 let dt = resolved_root_datatype(ws, rs, false, false);
2359 let mut dec = Decoder::try_new(&dt).unwrap();
2360
2361 let mut data = encode_avro_long(1);
2362 data.extend(encode_avro_bytes("z".as_bytes()));
2363 let mut cur = AvroCursor::new(&data);
2364 let res = dec.decode(&mut cur);
2365 assert!(
2366 res.is_err(),
2367 "expected error when writer union branch does not resolve to reader non-union type"
2368 );
2369 }
2370
2371 #[test]
2372 fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2373 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2374 let rs = Schema::Union(vec![
2375 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2376 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2377 ]);
2378
2379 let dt = resolved_root_datatype(ws, rs, false, false);
2380 let mut dec = Decoder::try_new(&dt).unwrap();
2381
2382 let data = encode_avro_int(6);
2383 let mut cur = AvroCursor::new(&data);
2384 dec.decode(&mut cur).unwrap();
2385
2386 let arr = dec.flush(None).unwrap();
2387 let ua = arr
2388 .as_any()
2389 .downcast_ref::<UnionArray>()
2390 .expect("dense union output");
2391 assert_eq!(ua.len(), 1);
2392 assert_eq!(
2393 ua.type_id(0),
2394 1,
2395 "must resolve to reader 'long' branch (type_id 1)"
2396 );
2397 assert_eq!(ua.value_offset(0), 0);
2398
2399 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2400 assert_eq!(long_child.len(), 1);
2401 assert_eq!(long_child.value(0), 6);
2402
2403 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2404 assert_eq!(str_child.len(), 0, "string branch must be empty");
2405 }
2406
2407 #[test]
2408 fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2409 let ws = Schema::Union(vec![
2410 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2411 Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2412 ]);
2413 let rs = Schema::Union(vec![
2414 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2415 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2416 ]);
2417
2418 let dt = resolved_root_datatype(ws, rs, false, false);
2419 let mut dec = Decoder::try_new(&dt).unwrap();
2420
2421 let mut data = encode_avro_long(1);
2422 data.push(1);
2423 let mut cur = AvroCursor::new(&data);
2424 let res = dec.decode(&mut cur);
2425 assert!(
2426 res.is_err(),
2427 "expected error for unmapped writer 'boolean' branch"
2428 );
2429 }
2430
2431 #[test]
2432 fn test_schema_resolution_promotion_int_to_long() {
2433 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2434 assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2435 for v in [0, 1, -2, 123456] {
2436 let data = encode_avro_int(v);
2437 let mut cur = AvroCursor::new(&data);
2438 dec.decode(&mut cur).unwrap();
2439 }
2440 let arr = dec.flush(None).unwrap();
2441 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2442 assert_eq!(a.value(0), 0);
2443 assert_eq!(a.value(1), 1);
2444 assert_eq!(a.value(2), -2);
2445 assert_eq!(a.value(3), 123456);
2446 }
2447
2448 #[test]
2449 fn test_schema_resolution_promotion_int_to_float() {
2450 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2451 assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2452 for v in [0, 42, -7] {
2453 let data = encode_avro_int(v);
2454 let mut cur = AvroCursor::new(&data);
2455 dec.decode(&mut cur).unwrap();
2456 }
2457 let arr = dec.flush(None).unwrap();
2458 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2459 assert_eq!(a.value(0), 0.0);
2460 assert_eq!(a.value(1), 42.0);
2461 assert_eq!(a.value(2), -7.0);
2462 }
2463
2464 #[test]
2465 fn test_schema_resolution_promotion_int_to_double() {
2466 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2467 assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2468 for v in [1, -1, 10_000] {
2469 let data = encode_avro_int(v);
2470 let mut cur = AvroCursor::new(&data);
2471 dec.decode(&mut cur).unwrap();
2472 }
2473 let arr = dec.flush(None).unwrap();
2474 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2475 assert_eq!(a.value(0), 1.0);
2476 assert_eq!(a.value(1), -1.0);
2477 assert_eq!(a.value(2), 10_000.0);
2478 }
2479
2480 #[test]
2481 fn test_schema_resolution_promotion_long_to_float() {
2482 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2483 assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2484 for v in [0_i64, 1_000_000_i64, -123_i64] {
2485 let data = encode_avro_long(v);
2486 let mut cur = AvroCursor::new(&data);
2487 dec.decode(&mut cur).unwrap();
2488 }
2489 let arr = dec.flush(None).unwrap();
2490 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2491 assert_eq!(a.value(0), 0.0);
2492 assert_eq!(a.value(1), 1_000_000.0);
2493 assert_eq!(a.value(2), -123.0);
2494 }
2495
2496 #[test]
2497 fn test_schema_resolution_promotion_long_to_double() {
2498 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2499 assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2500 for v in [2_i64, -2_i64, 9_223_372_i64] {
2501 let data = encode_avro_long(v);
2502 let mut cur = AvroCursor::new(&data);
2503 dec.decode(&mut cur).unwrap();
2504 }
2505 let arr = dec.flush(None).unwrap();
2506 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2507 assert_eq!(a.value(0), 2.0);
2508 assert_eq!(a.value(1), -2.0);
2509 assert_eq!(a.value(2), 9_223_372.0);
2510 }
2511
2512 #[test]
2513 fn test_schema_resolution_promotion_float_to_double() {
2514 let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2515 assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2516 for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2517 let data = v.to_le_bytes().to_vec();
2518 let mut cur = AvroCursor::new(&data);
2519 dec.decode(&mut cur).unwrap();
2520 }
2521 let arr = dec.flush(None).unwrap();
2522 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2523 assert_eq!(a.value(0), 0.5_f64);
2524 assert_eq!(a.value(1), -3.25_f64);
2525 assert_eq!(a.value(2), 1.0e6_f64);
2526 }
2527
2528 #[test]
2529 fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2530 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2531 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2532 for s in ["hello", "world", "héllo"] {
2533 let data = encode_avro_bytes(s.as_bytes());
2534 let mut cur = AvroCursor::new(&data);
2535 dec.decode(&mut cur).unwrap();
2536 }
2537 let arr = dec.flush(None).unwrap();
2538 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2539 assert_eq!(a.value(0), "hello");
2540 assert_eq!(a.value(1), "world");
2541 assert_eq!(a.value(2), "héllo");
2542 }
2543
2544 #[test]
2545 fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2546 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2547 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2548 let data = encode_avro_bytes("abc".as_bytes());
2549 let mut cur = AvroCursor::new(&data);
2550 dec.decode(&mut cur).unwrap();
2551 let arr = dec.flush(None).unwrap();
2552 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2553 assert_eq!(a.value(0), "abc");
2554 }
2555
2556 #[test]
2557 fn test_schema_resolution_promotion_string_to_bytes() {
2558 let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2559 assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2560 for s in ["", "abc", "data"] {
2561 let data = encode_avro_bytes(s.as_bytes());
2562 let mut cur = AvroCursor::new(&data);
2563 dec.decode(&mut cur).unwrap();
2564 }
2565 let arr = dec.flush(None).unwrap();
2566 let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2567 assert_eq!(a.value(0), b"");
2568 assert_eq!(a.value(1), b"abc");
2569 assert_eq!(a.value(2), "data".as_bytes());
2570 }
2571
2572 #[test]
2573 fn test_schema_resolution_no_promotion_passthrough_int() {
2574 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2575 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2576 let writer_record = Schema::Complex(ComplexType::Record(Record {
2578 name: "Root",
2579 namespace: None,
2580 doc: None,
2581 aliases: vec![],
2582 fields: vec![Field {
2583 name: "v",
2584 r#type: ws,
2585 default: None,
2586 doc: None,
2587 aliases: vec![],
2588 }],
2589 attributes: Attributes::default(),
2590 }));
2591 let reader_record = Schema::Complex(ComplexType::Record(Record {
2592 name: "Root",
2593 namespace: None,
2594 doc: None,
2595 aliases: vec![],
2596 fields: vec![Field {
2597 name: "v",
2598 r#type: rs,
2599 default: None,
2600 doc: None,
2601 aliases: vec![],
2602 }],
2603 attributes: Attributes::default(),
2604 }));
2605 let field = AvroFieldBuilder::new(&writer_record)
2606 .with_reader_schema(&reader_record)
2607 .with_utf8view(false)
2608 .with_strict_mode(false)
2609 .build()
2610 .unwrap();
2611 let dt = match field.data_type().codec() {
2613 Codec::Struct(fields) => fields[0].data_type().clone(),
2614 other => panic!("expected wrapper struct, got {other:?}"),
2615 };
2616 let mut dec = Decoder::try_new(&dt).unwrap();
2617 assert!(matches!(dec, Decoder::Int32(_)));
2618 for v in [7, -9] {
2619 let data = encode_avro_int(v);
2620 let mut cur = AvroCursor::new(&data);
2621 dec.decode(&mut cur).unwrap();
2622 }
2623 let arr = dec.flush(None).unwrap();
2624 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2625 assert_eq!(a.value(0), 7);
2626 assert_eq!(a.value(1), -9);
2627 }
2628
2629 #[test]
2630 fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
2631 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2632 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
2633 let writer_record = Schema::Complex(ComplexType::Record(Record {
2634 name: "Root",
2635 namespace: None,
2636 doc: None,
2637 aliases: vec![],
2638 fields: vec![Field {
2639 name: "v",
2640 r#type: ws,
2641 default: None,
2642 doc: None,
2643 aliases: vec![],
2644 }],
2645 attributes: Attributes::default(),
2646 }));
2647 let reader_record = Schema::Complex(ComplexType::Record(Record {
2648 name: "Root",
2649 namespace: None,
2650 doc: None,
2651 aliases: vec![],
2652 fields: vec![Field {
2653 name: "v",
2654 r#type: rs,
2655 default: None,
2656 doc: None,
2657 aliases: vec![],
2658 }],
2659 attributes: Attributes::default(),
2660 }));
2661 let res = AvroFieldBuilder::new(&writer_record)
2662 .with_reader_schema(&reader_record)
2663 .with_utf8view(false)
2664 .with_strict_mode(false)
2665 .build();
2666 assert!(res.is_err(), "expected error for illegal promotion");
2667 }
2668
2669 #[test]
2670 fn test_map_decoding_one_entry() {
2671 let value_type = avro_from_codec(Codec::Utf8);
2672 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2673 let mut decoder = Decoder::try_new(&map_type).unwrap();
2674 let mut data = Vec::new();
2676 data.extend_from_slice(&encode_avro_long(1));
2677 data.extend_from_slice(&encode_avro_bytes(b"hello")); data.extend_from_slice(&encode_avro_bytes(b"world")); data.extend_from_slice(&encode_avro_long(0));
2680 let mut cursor = AvroCursor::new(&data);
2681 decoder.decode(&mut cursor).unwrap();
2682 let array = decoder.flush(None).unwrap();
2683 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2684 assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 1);
2686 let entries = map_arr.value(0);
2687 let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
2688 assert_eq!(struct_entries.len(), 1);
2689 let key_arr = struct_entries
2690 .column_by_name("key")
2691 .unwrap()
2692 .as_any()
2693 .downcast_ref::<StringArray>()
2694 .unwrap();
2695 let val_arr = struct_entries
2696 .column_by_name("value")
2697 .unwrap()
2698 .as_any()
2699 .downcast_ref::<StringArray>()
2700 .unwrap();
2701 assert_eq!(key_arr.value(0), "hello");
2702 assert_eq!(val_arr.value(0), "world");
2703 }
2704
2705 #[test]
2706 fn test_map_decoding_empty() {
2707 let value_type = avro_from_codec(Codec::Utf8);
2708 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2709 let mut decoder = Decoder::try_new(&map_type).unwrap();
2710 let data = encode_avro_long(0);
2711 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2712 let array = decoder.flush(None).unwrap();
2713 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2714 assert_eq!(map_arr.len(), 1);
2715 assert_eq!(map_arr.value_length(0), 0);
2716 }
2717
2718 #[test]
2719 fn test_fixed_decoding() {
2720 let avro_type = avro_from_codec(Codec::Fixed(3));
2721 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2722
2723 let data1 = [1u8, 2, 3];
2724 let mut cursor1 = AvroCursor::new(&data1);
2725 decoder
2726 .decode(&mut cursor1)
2727 .expect("Failed to decode data1");
2728 assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
2729 let data2 = [4u8, 5, 6];
2730 let mut cursor2 = AvroCursor::new(&data2);
2731 decoder
2732 .decode(&mut cursor2)
2733 .expect("Failed to decode data2");
2734 assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
2735 let array = decoder.flush(None).expect("Failed to flush decoder");
2736 assert_eq!(array.len(), 2, "Array should contain two items");
2737 let fixed_size_binary_array = array
2738 .as_any()
2739 .downcast_ref::<FixedSizeBinaryArray>()
2740 .expect("Failed to downcast to FixedSizeBinaryArray");
2741 assert_eq!(
2742 fixed_size_binary_array.value_length(),
2743 3,
2744 "Fixed size of binary values should be 3"
2745 );
2746 assert_eq!(
2747 fixed_size_binary_array.value(0),
2748 &[1, 2, 3],
2749 "First item mismatch"
2750 );
2751 assert_eq!(
2752 fixed_size_binary_array.value(1),
2753 &[4, 5, 6],
2754 "Second item mismatch"
2755 );
2756 }
2757
2758 #[test]
2759 fn test_fixed_decoding_empty() {
2760 let avro_type = avro_from_codec(Codec::Fixed(5));
2761 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2762
2763 let array = decoder
2764 .flush(None)
2765 .expect("Failed to flush decoder for empty input");
2766
2767 assert_eq!(array.len(), 0, "Array should be empty");
2768 let fixed_size_binary_array = array
2769 .as_any()
2770 .downcast_ref::<FixedSizeBinaryArray>()
2771 .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
2772
2773 assert_eq!(
2774 fixed_size_binary_array.value_length(),
2775 5,
2776 "Fixed size of binary values should be 5 as per type"
2777 );
2778 }
2779
2780 #[test]
2781 fn test_uuid_decoding() {
2782 let avro_type = avro_from_codec(Codec::Uuid);
2783 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2784 let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
2785 let data = encode_avro_bytes(uuid_str.as_bytes());
2786 let mut cursor = AvroCursor::new(&data);
2787 decoder.decode(&mut cursor).expect("Failed to decode data");
2788 assert_eq!(
2789 cursor.position(),
2790 data.len(),
2791 "Cursor should advance by varint size + data size"
2792 );
2793 let array = decoder.flush(None).expect("Failed to flush decoder");
2794 let fixed_size_binary_array = array
2795 .as_any()
2796 .downcast_ref::<FixedSizeBinaryArray>()
2797 .expect("Array should be a FixedSizeBinaryArray");
2798 assert_eq!(fixed_size_binary_array.len(), 1);
2799 assert_eq!(fixed_size_binary_array.value_length(), 16);
2800 let expected_bytes = [
2801 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
2802 0x6b, 0xf6,
2803 ];
2804 assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
2805 }
2806
2807 #[test]
2808 fn test_array_decoding() {
2809 let item_dt = avro_from_codec(Codec::Int32);
2810 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2811 let mut decoder = Decoder::try_new(&list_dt).unwrap();
2812 let mut row1 = Vec::new();
2813 row1.extend_from_slice(&encode_avro_long(2));
2814 row1.extend_from_slice(&encode_avro_int(10));
2815 row1.extend_from_slice(&encode_avro_int(20));
2816 row1.extend_from_slice(&encode_avro_long(0));
2817 let row2 = encode_avro_long(0);
2818 let mut cursor = AvroCursor::new(&row1);
2819 decoder.decode(&mut cursor).unwrap();
2820 let mut cursor2 = AvroCursor::new(&row2);
2821 decoder.decode(&mut cursor2).unwrap();
2822 let array = decoder.flush(None).unwrap();
2823 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2824 assert_eq!(list_arr.len(), 2);
2825 let offsets = list_arr.value_offsets();
2826 assert_eq!(offsets, &[0, 2, 2]);
2827 let values = list_arr.values();
2828 let int_arr = values.as_primitive::<Int32Type>();
2829 assert_eq!(int_arr.len(), 2);
2830 assert_eq!(int_arr.value(0), 10);
2831 assert_eq!(int_arr.value(1), 20);
2832 }
2833
2834 #[test]
2835 fn test_array_decoding_with_negative_block_count() {
2836 let item_dt = avro_from_codec(Codec::Int32);
2837 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2838 let mut decoder = Decoder::try_new(&list_dt).unwrap();
2839 let mut data = encode_avro_long(-3);
2840 data.extend_from_slice(&encode_avro_long(12));
2841 data.extend_from_slice(&encode_avro_int(1));
2842 data.extend_from_slice(&encode_avro_int(2));
2843 data.extend_from_slice(&encode_avro_int(3));
2844 data.extend_from_slice(&encode_avro_long(0));
2845 let mut cursor = AvroCursor::new(&data);
2846 decoder.decode(&mut cursor).unwrap();
2847 let array = decoder.flush(None).unwrap();
2848 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2849 assert_eq!(list_arr.len(), 1);
2850 assert_eq!(list_arr.value_length(0), 3);
2851 let values = list_arr.values().as_primitive::<Int32Type>();
2852 assert_eq!(values.len(), 3);
2853 assert_eq!(values.value(0), 1);
2854 assert_eq!(values.value(1), 2);
2855 assert_eq!(values.value(2), 3);
2856 }
2857
2858 #[test]
2859 fn test_nested_array_decoding() {
2860 let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
2861 let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
2862 let mut decoder = Decoder::try_new(&nested_ty).unwrap();
2863 let mut buf = Vec::new();
2864 buf.extend(encode_avro_long(1));
2865 buf.extend(encode_avro_long(2));
2866 buf.extend(encode_avro_int(5));
2867 buf.extend(encode_avro_int(6));
2868 buf.extend(encode_avro_long(0));
2869 buf.extend(encode_avro_long(0));
2870 let mut cursor = AvroCursor::new(&buf);
2871 decoder.decode(&mut cursor).unwrap();
2872 let arr = decoder.flush(None).unwrap();
2873 let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
2874 assert_eq!(outer.len(), 1);
2875 assert_eq!(outer.value_length(0), 1);
2876 let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
2877 assert_eq!(inner.len(), 1);
2878 assert_eq!(inner.value_length(0), 2);
2879 let values = inner
2880 .values()
2881 .as_any()
2882 .downcast_ref::<Int32Array>()
2883 .unwrap();
2884 assert_eq!(values.values(), &[5, 6]);
2885 }
2886
2887 #[test]
2888 fn test_array_decoding_empty_array() {
2889 let value_type = avro_from_codec(Codec::Utf8);
2890 let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
2891 let mut decoder = Decoder::try_new(&map_type).unwrap();
2892 let data = encode_avro_long(0);
2893 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2894 let array = decoder.flush(None).unwrap();
2895 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2896 assert_eq!(list_arr.len(), 1);
2897 assert_eq!(list_arr.value_length(0), 0);
2898 }
2899
2900 #[test]
2901 fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
2902 use crate::schema::Array;
2903 let writer_schema = Schema::Complex(ComplexType::Array(Array {
2904 items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2905 attributes: Attributes::default(),
2906 }));
2907 let reader_schema = Schema::Complex(ComplexType::Array(Array {
2908 items: Box::new(Schema::Union(vec![
2909 Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2910 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2911 ])),
2912 attributes: Attributes::default(),
2913 }));
2914 let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
2915 if let Codec::List(inner) = dt.codec() {
2916 assert_eq!(
2917 inner.nullability(),
2918 Some(Nullability::NullFirst),
2919 "items should be nullable"
2920 );
2921 } else {
2922 panic!("expected List codec");
2923 }
2924 let mut decoder = Decoder::try_new(&dt).unwrap();
2925 let mut data = encode_avro_long(2);
2926 data.extend(encode_avro_int(10));
2927 data.extend(encode_avro_int(20));
2928 data.extend(encode_avro_long(0));
2929 let mut cursor = AvroCursor::new(&data);
2930 decoder.decode(&mut cursor).unwrap();
2931 assert_eq!(
2932 cursor.position(),
2933 data.len(),
2934 "all bytes should be consumed"
2935 );
2936 let array = decoder.flush(None).unwrap();
2937 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2938 assert_eq!(list_arr.len(), 1, "one list/row");
2939 assert_eq!(list_arr.value_length(0), 2, "two items in the list");
2940 let values = list_arr.values().as_primitive::<Int32Type>();
2941 assert_eq!(values.len(), 2);
2942 assert_eq!(values.value(0), 10);
2943 assert_eq!(values.value(1), 20);
2944 assert!(!values.is_null(0));
2945 assert!(!values.is_null(1));
2946 }
2947
2948 #[test]
2949 fn test_decimal_decoding_fixed256() {
2950 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
2951 let mut decoder = Decoder::try_new(&dt).unwrap();
2952 let row1 = [
2953 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2954 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2955 0x00, 0x00, 0x30, 0x39,
2956 ];
2957 let row2 = [
2958 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2959 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2960 0xFF, 0xFF, 0xFF, 0x85,
2961 ];
2962 let mut data = Vec::new();
2963 data.extend_from_slice(&row1);
2964 data.extend_from_slice(&row2);
2965 let mut cursor = AvroCursor::new(&data);
2966 decoder.decode(&mut cursor).unwrap();
2967 decoder.decode(&mut cursor).unwrap();
2968 let arr = decoder.flush(None).unwrap();
2969 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
2970 assert_eq!(dec.len(), 2);
2971 assert_eq!(dec.value_as_string(0), "123.45");
2972 assert_eq!(dec.value_as_string(1), "-1.23");
2973 }
2974
2975 #[test]
2976 fn test_decimal_decoding_fixed128() {
2977 let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
2978 let mut decoder = Decoder::try_new(&dt).unwrap();
2979 let row1 = [
2980 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2981 0x30, 0x39,
2982 ];
2983 let row2 = [
2984 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2985 0xFF, 0x85,
2986 ];
2987 let mut data = Vec::new();
2988 data.extend_from_slice(&row1);
2989 data.extend_from_slice(&row2);
2990 let mut cursor = AvroCursor::new(&data);
2991 decoder.decode(&mut cursor).unwrap();
2992 decoder.decode(&mut cursor).unwrap();
2993 let arr = decoder.flush(None).unwrap();
2994 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2995 assert_eq!(dec.len(), 2);
2996 assert_eq!(dec.value_as_string(0), "123.45");
2997 assert_eq!(dec.value_as_string(1), "-1.23");
2998 }
2999
3000 #[test]
3001 fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
3002 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
3003 let mut decoder = Decoder::try_new(&dt).unwrap();
3004 let row1 = [
3005 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3006 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3007 0x00, 0x00, 0x30, 0x39,
3008 ];
3009 let row2 = [
3010 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3011 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3012 0xFF, 0xFF, 0xFF, 0x85,
3013 ];
3014 let mut data = Vec::new();
3015 data.extend_from_slice(&row1);
3016 data.extend_from_slice(&row2);
3017 let mut cursor = AvroCursor::new(&data);
3018 decoder.decode(&mut cursor).unwrap();
3019 decoder.decode(&mut cursor).unwrap();
3020 let arr = decoder.flush(None).unwrap();
3021 #[cfg(feature = "small_decimals")]
3022 {
3023 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3024 assert_eq!(dec.len(), 2);
3025 assert_eq!(dec.value_as_string(0), "123.45");
3026 assert_eq!(dec.value_as_string(1), "-1.23");
3027 }
3028 #[cfg(not(feature = "small_decimals"))]
3029 {
3030 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3031 assert_eq!(dec.len(), 2);
3032 assert_eq!(dec.value_as_string(0), "123.45");
3033 assert_eq!(dec.value_as_string(1), "-1.23");
3034 }
3035 }
3036
3037 #[test]
3038 fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
3039 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
3040 let mut decoder = Decoder::try_new(&dt).unwrap();
3041 let row1 = [
3042 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3043 0x30, 0x39,
3044 ];
3045 let row2 = [
3046 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3047 0xFF, 0x85,
3048 ];
3049 let mut data = Vec::new();
3050 data.extend_from_slice(&row1);
3051 data.extend_from_slice(&row2);
3052 let mut cursor = AvroCursor::new(&data);
3053 decoder.decode(&mut cursor).unwrap();
3054 decoder.decode(&mut cursor).unwrap();
3055
3056 let arr = decoder.flush(None).unwrap();
3057 #[cfg(feature = "small_decimals")]
3058 {
3059 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3060 assert_eq!(dec.len(), 2);
3061 assert_eq!(dec.value_as_string(0), "123.45");
3062 assert_eq!(dec.value_as_string(1), "-1.23");
3063 }
3064 #[cfg(not(feature = "small_decimals"))]
3065 {
3066 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3067 assert_eq!(dec.len(), 2);
3068 assert_eq!(dec.value_as_string(0), "123.45");
3069 assert_eq!(dec.value_as_string(1), "-1.23");
3070 }
3071 }
3072
3073 #[test]
3074 fn test_decimal_decoding_bytes_with_nulls() {
3075 let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3076 let inner = Decoder::try_new(&dt).unwrap();
3077 let mut decoder = Decoder::Nullable(
3078 Nullability::NullSecond,
3079 NullBufferBuilder::new(DEFAULT_CAPACITY),
3080 Box::new(inner),
3081 NullablePlan::ReadTag,
3082 );
3083 let mut data = Vec::new();
3084 data.extend_from_slice(&encode_avro_int(0));
3085 data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3086 data.extend_from_slice(&encode_avro_int(1));
3087 data.extend_from_slice(&encode_avro_int(0));
3088 data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3089 let mut cursor = AvroCursor::new(&data);
3090 decoder.decode(&mut cursor).unwrap();
3091 decoder.decode(&mut cursor).unwrap();
3092 decoder.decode(&mut cursor).unwrap();
3093 let arr = decoder.flush(None).unwrap();
3094 #[cfg(feature = "small_decimals")]
3095 {
3096 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3097 assert_eq!(dec_arr.len(), 3);
3098 assert!(dec_arr.is_valid(0));
3099 assert!(!dec_arr.is_valid(1));
3100 assert!(dec_arr.is_valid(2));
3101 assert_eq!(dec_arr.value_as_string(0), "123.4");
3102 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3103 }
3104 #[cfg(not(feature = "small_decimals"))]
3105 {
3106 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3107 assert_eq!(dec_arr.len(), 3);
3108 assert!(dec_arr.is_valid(0));
3109 assert!(!dec_arr.is_valid(1));
3110 assert!(dec_arr.is_valid(2));
3111 assert_eq!(dec_arr.value_as_string(0), "123.4");
3112 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3113 }
3114 }
3115
3116 #[test]
3117 fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3118 let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3119 let inner = Decoder::try_new(&dt).unwrap();
3120 let mut decoder = Decoder::Nullable(
3121 Nullability::NullSecond,
3122 NullBufferBuilder::new(DEFAULT_CAPACITY),
3123 Box::new(inner),
3124 NullablePlan::ReadTag,
3125 );
3126 let row1 = [
3127 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3128 0xE2, 0x40,
3129 ];
3130 let row3 = [
3131 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3132 0x1D, 0xC0,
3133 ];
3134 let mut data = Vec::new();
3135 data.extend_from_slice(&encode_avro_int(0));
3136 data.extend_from_slice(&row1);
3137 data.extend_from_slice(&encode_avro_int(1));
3138 data.extend_from_slice(&encode_avro_int(0));
3139 data.extend_from_slice(&row3);
3140 let mut cursor = AvroCursor::new(&data);
3141 decoder.decode(&mut cursor).unwrap();
3142 decoder.decode(&mut cursor).unwrap();
3143 decoder.decode(&mut cursor).unwrap();
3144 let arr = decoder.flush(None).unwrap();
3145 #[cfg(feature = "small_decimals")]
3146 {
3147 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3148 assert_eq!(dec_arr.len(), 3);
3149 assert!(dec_arr.is_valid(0));
3150 assert!(!dec_arr.is_valid(1));
3151 assert!(dec_arr.is_valid(2));
3152 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3153 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3154 }
3155 #[cfg(not(feature = "small_decimals"))]
3156 {
3157 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3158 assert_eq!(dec_arr.len(), 3);
3159 assert!(dec_arr.is_valid(0));
3160 assert!(!dec_arr.is_valid(1));
3161 assert!(dec_arr.is_valid(2));
3162 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3163 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3164 }
3165 }
3166
3167 #[test]
3168 fn test_enum_decoding() {
3169 let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3170 let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3171 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3172 let mut data = Vec::new();
3173 data.extend_from_slice(&encode_avro_int(2));
3174 data.extend_from_slice(&encode_avro_int(0));
3175 data.extend_from_slice(&encode_avro_int(1));
3176 let mut cursor = AvroCursor::new(&data);
3177 decoder.decode(&mut cursor).unwrap();
3178 decoder.decode(&mut cursor).unwrap();
3179 decoder.decode(&mut cursor).unwrap();
3180 let array = decoder.flush(None).unwrap();
3181 let dict_array = array
3182 .as_any()
3183 .downcast_ref::<DictionaryArray<Int32Type>>()
3184 .unwrap();
3185 assert_eq!(dict_array.len(), 3);
3186 let values = dict_array
3187 .values()
3188 .as_any()
3189 .downcast_ref::<StringArray>()
3190 .unwrap();
3191 assert_eq!(values.value(0), "A");
3192 assert_eq!(values.value(1), "B");
3193 assert_eq!(values.value(2), "C");
3194 assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3195 }
3196
3197 #[test]
3198 fn test_enum_decoding_with_nulls() {
3199 let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3200 let enum_codec = Codec::Enum(symbols.clone());
3201 let avro_type =
3202 AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3203 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3204 let mut data = Vec::new();
3205 data.extend_from_slice(&encode_avro_long(1));
3206 data.extend_from_slice(&encode_avro_int(1));
3207 data.extend_from_slice(&encode_avro_long(0));
3208 data.extend_from_slice(&encode_avro_long(1));
3209 data.extend_from_slice(&encode_avro_int(0));
3210 let mut cursor = AvroCursor::new(&data);
3211 decoder.decode(&mut cursor).unwrap();
3212 decoder.decode(&mut cursor).unwrap();
3213 decoder.decode(&mut cursor).unwrap();
3214 let array = decoder.flush(None).unwrap();
3215 let dict_array = array
3216 .as_any()
3217 .downcast_ref::<DictionaryArray<Int32Type>>()
3218 .unwrap();
3219 assert_eq!(dict_array.len(), 3);
3220 assert!(dict_array.is_valid(0));
3221 assert!(dict_array.is_null(1));
3222 assert!(dict_array.is_valid(2));
3223 let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3224 assert_eq!(dict_array.keys(), &expected_keys);
3225 let values = dict_array
3226 .values()
3227 .as_any()
3228 .downcast_ref::<StringArray>()
3229 .unwrap();
3230 assert_eq!(values.value(0), "X");
3231 assert_eq!(values.value(1), "Y");
3232 }
3233
3234 #[test]
3235 fn test_duration_decoding_with_nulls() {
3236 let duration_codec = Codec::Interval;
3237 let avro_type = AvroDataType::new(
3238 duration_codec,
3239 Default::default(),
3240 Some(Nullability::NullFirst),
3241 );
3242 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3243 let mut data = Vec::new();
3244 data.extend_from_slice(&encode_avro_long(1)); let mut duration1 = Vec::new();
3247 duration1.extend_from_slice(&1u32.to_le_bytes());
3248 duration1.extend_from_slice(&2u32.to_le_bytes());
3249 duration1.extend_from_slice(&3u32.to_le_bytes());
3250 data.extend_from_slice(&duration1);
3251 data.extend_from_slice(&encode_avro_long(0)); data.extend_from_slice(&encode_avro_long(1)); let mut duration2 = Vec::new();
3255 duration2.extend_from_slice(&4u32.to_le_bytes());
3256 duration2.extend_from_slice(&5u32.to_le_bytes());
3257 duration2.extend_from_slice(&6u32.to_le_bytes());
3258 data.extend_from_slice(&duration2);
3259 let mut cursor = AvroCursor::new(&data);
3260 decoder.decode(&mut cursor).unwrap();
3261 decoder.decode(&mut cursor).unwrap();
3262 decoder.decode(&mut cursor).unwrap();
3263 let array = decoder.flush(None).unwrap();
3264 let interval_array = array
3265 .as_any()
3266 .downcast_ref::<IntervalMonthDayNanoArray>()
3267 .unwrap();
3268 assert_eq!(interval_array.len(), 3);
3269 assert!(interval_array.is_valid(0));
3270 assert!(interval_array.is_null(1));
3271 assert!(interval_array.is_valid(2));
3272 let expected = IntervalMonthDayNanoArray::from(vec![
3273 Some(IntervalMonthDayNano {
3274 months: 1,
3275 days: 2,
3276 nanoseconds: 3_000_000,
3277 }),
3278 None,
3279 Some(IntervalMonthDayNano {
3280 months: 4,
3281 days: 5,
3282 nanoseconds: 6_000_000,
3283 }),
3284 ]);
3285 assert_eq!(interval_array, &expected);
3286 }
3287
3288 #[test]
3289 fn test_duration_decoding_empty() {
3290 let duration_codec = Codec::Interval;
3291 let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3292 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3293 let array = decoder.flush(None).unwrap();
3294 assert_eq!(array.len(), 0);
3295 }
3296
3297 #[test]
3298 #[cfg(feature = "avro_custom_types")]
3299 fn test_duration_seconds_decoding() {
3300 let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3301 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3302 let mut data = Vec::new();
3303 data.extend_from_slice(&encode_avro_long(0));
3305 data.extend_from_slice(&encode_avro_long(-1));
3306 data.extend_from_slice(&encode_avro_long(2));
3307 let mut cursor = AvroCursor::new(&data);
3308 decoder.decode(&mut cursor).unwrap();
3309 decoder.decode(&mut cursor).unwrap();
3310 decoder.decode(&mut cursor).unwrap();
3311 let array = decoder.flush(None).unwrap();
3312 let dur = array
3313 .as_any()
3314 .downcast_ref::<DurationSecondArray>()
3315 .unwrap();
3316 assert_eq!(dur.values(), &[0, -1, 2]);
3317 }
3318
3319 #[test]
3320 #[cfg(feature = "avro_custom_types")]
3321 fn test_duration_milliseconds_decoding() {
3322 let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3323 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3324 let mut data = Vec::new();
3325 for v in [1i64, 0, -2] {
3326 data.extend_from_slice(&encode_avro_long(v));
3327 }
3328 let mut cursor = AvroCursor::new(&data);
3329 for _ in 0..3 {
3330 decoder.decode(&mut cursor).unwrap();
3331 }
3332 let array = decoder.flush(None).unwrap();
3333 let dur = array
3334 .as_any()
3335 .downcast_ref::<DurationMillisecondArray>()
3336 .unwrap();
3337 assert_eq!(dur.values(), &[1, 0, -2]);
3338 }
3339
3340 #[test]
3341 #[cfg(feature = "avro_custom_types")]
3342 fn test_duration_microseconds_decoding() {
3343 let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3344 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3345 let mut data = Vec::new();
3346 for v in [5i64, -6, 7] {
3347 data.extend_from_slice(&encode_avro_long(v));
3348 }
3349 let mut cursor = AvroCursor::new(&data);
3350 for _ in 0..3 {
3351 decoder.decode(&mut cursor).unwrap();
3352 }
3353 let array = decoder.flush(None).unwrap();
3354 let dur = array
3355 .as_any()
3356 .downcast_ref::<DurationMicrosecondArray>()
3357 .unwrap();
3358 assert_eq!(dur.values(), &[5, -6, 7]);
3359 }
3360
3361 #[test]
3362 #[cfg(feature = "avro_custom_types")]
3363 fn test_duration_nanoseconds_decoding() {
3364 let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3365 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3366 let mut data = Vec::new();
3367 for v in [8i64, 9, -10] {
3368 data.extend_from_slice(&encode_avro_long(v));
3369 }
3370 let mut cursor = AvroCursor::new(&data);
3371 for _ in 0..3 {
3372 decoder.decode(&mut cursor).unwrap();
3373 }
3374 let array = decoder.flush(None).unwrap();
3375 let dur = array
3376 .as_any()
3377 .downcast_ref::<DurationNanosecondArray>()
3378 .unwrap();
3379 assert_eq!(dur.values(), &[8, 9, -10]);
3380 }
3381
3382 #[test]
3383 fn test_nullable_decode_error_bitmap_corruption() {
3384 let avro_type = AvroDataType::new(
3386 Codec::Int32,
3387 Default::default(),
3388 Some(Nullability::NullSecond),
3389 );
3390 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3391
3392 let mut row1 = Vec::new();
3394 row1.extend_from_slice(&encode_avro_int(1));
3395
3396 let mut row2 = Vec::new();
3398 row2.extend_from_slice(&encode_avro_int(0)); let mut row3 = Vec::new();
3402 row3.extend_from_slice(&encode_avro_int(0)); row3.extend_from_slice(&encode_avro_int(42)); decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3406 assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3408
3409 let array = decoder.flush(None).unwrap();
3410
3411 assert_eq!(array.len(), 2);
3413 let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3414 assert!(int_array.is_null(0)); assert_eq!(int_array.value(1), 42); }
3417
3418 #[test]
3419 fn test_enum_mapping_reordered_symbols() {
3420 let reader_symbols: Arc<[String]> =
3421 vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3422 let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3423 let default_index: i32 = -1;
3424 let mut dec = Decoder::Enum(
3425 Vec::with_capacity(DEFAULT_CAPACITY),
3426 reader_symbols.clone(),
3427 Some(EnumResolution {
3428 mapping,
3429 default_index,
3430 }),
3431 );
3432 let mut data = Vec::new();
3433 data.extend_from_slice(&encode_avro_int(0));
3434 data.extend_from_slice(&encode_avro_int(1));
3435 data.extend_from_slice(&encode_avro_int(2));
3436 let mut cur = AvroCursor::new(&data);
3437 dec.decode(&mut cur).unwrap();
3438 dec.decode(&mut cur).unwrap();
3439 dec.decode(&mut cur).unwrap();
3440 let arr = dec.flush(None).unwrap();
3441 let dict = arr
3442 .as_any()
3443 .downcast_ref::<DictionaryArray<Int32Type>>()
3444 .unwrap();
3445 let expected_keys = Int32Array::from(vec![2, 0, 1]);
3446 assert_eq!(dict.keys(), &expected_keys);
3447 let values = dict
3448 .values()
3449 .as_any()
3450 .downcast_ref::<StringArray>()
3451 .unwrap();
3452 assert_eq!(values.value(0), "B");
3453 assert_eq!(values.value(1), "C");
3454 assert_eq!(values.value(2), "A");
3455 }
3456
3457 #[test]
3458 fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3459 let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3460 let default_index: i32 = 1;
3461 let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3462 let mut dec = Decoder::Enum(
3463 Vec::with_capacity(DEFAULT_CAPACITY),
3464 reader_symbols.clone(),
3465 Some(EnumResolution {
3466 mapping,
3467 default_index,
3468 }),
3469 );
3470 let mut data = Vec::new();
3471 data.extend_from_slice(&encode_avro_int(0));
3472 data.extend_from_slice(&encode_avro_int(1));
3473 data.extend_from_slice(&encode_avro_int(99));
3474 let mut cur = AvroCursor::new(&data);
3475 dec.decode(&mut cur).unwrap();
3476 dec.decode(&mut cur).unwrap();
3477 dec.decode(&mut cur).unwrap();
3478 let arr = dec.flush(None).unwrap();
3479 let dict = arr
3480 .as_any()
3481 .downcast_ref::<DictionaryArray<Int32Type>>()
3482 .unwrap();
3483 let expected_keys = Int32Array::from(vec![0, 1, 1]);
3484 assert_eq!(dict.keys(), &expected_keys);
3485 let values = dict
3486 .values()
3487 .as_any()
3488 .downcast_ref::<StringArray>()
3489 .unwrap();
3490 assert_eq!(values.value(0), "A");
3491 assert_eq!(values.value(1), "B");
3492 }
3493
3494 #[test]
3495 fn test_enum_mapping_unknown_symbol_without_default_errors() {
3496 let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3497 let default_index: i32 = -1; let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3499 let mut dec = Decoder::Enum(
3500 Vec::with_capacity(DEFAULT_CAPACITY),
3501 reader_symbols,
3502 Some(EnumResolution {
3503 mapping,
3504 default_index,
3505 }),
3506 );
3507 let data = encode_avro_int(0);
3508 let mut cur = AvroCursor::new(&data);
3509 let err = dec
3510 .decode(&mut cur)
3511 .expect_err("expected decode error for unresolved enum without default");
3512 let msg = err.to_string();
3513 assert!(
3514 msg.contains("not resolvable") && msg.contains("no default"),
3515 "unexpected error message: {msg}"
3516 );
3517 }
3518
3519 fn make_record_resolved_decoder(
3520 reader_fields: &[(&str, DataType, bool)],
3521 writer_to_reader: Vec<Option<usize>>,
3522 skip_decoders: Vec<Option<Skipper>>,
3523 ) -> Decoder {
3524 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3525 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3526 for (name, dt, nullable) in reader_fields {
3527 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3528 let enc = match dt {
3529 DataType::Int32 => Decoder::Int32(Vec::new()),
3530 DataType::Int64 => Decoder::Int64(Vec::new()),
3531 DataType::Utf8 => {
3532 Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3533 }
3534 other => panic!("Unsupported test reader field type: {other:?}"),
3535 };
3536 encodings.push(enc);
3537 }
3538 let fields: Fields = field_refs.into();
3539 Decoder::Record(
3540 fields,
3541 encodings,
3542 Some(Projector {
3543 writer_to_reader: Arc::from(writer_to_reader),
3544 skip_decoders,
3545 field_defaults: vec![None; reader_fields.len()],
3546 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3547 }),
3548 )
3549 }
3550
3551 #[test]
3552 fn test_skip_writer_trailing_field_int32() {
3553 let mut dec = make_record_resolved_decoder(
3554 &[("id", arrow_schema::DataType::Int32, false)],
3555 vec![Some(0), None],
3556 vec![None, Some(super::Skipper::Int32)],
3557 );
3558 let mut data = Vec::new();
3559 data.extend_from_slice(&encode_avro_int(7));
3560 data.extend_from_slice(&encode_avro_int(999));
3561 let mut cur = AvroCursor::new(&data);
3562 dec.decode(&mut cur).unwrap();
3563 assert_eq!(cur.position(), data.len());
3564 let arr = dec.flush(None).unwrap();
3565 let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
3566 assert_eq!(struct_arr.len(), 1);
3567 let id = struct_arr
3568 .column_by_name("id")
3569 .unwrap()
3570 .as_any()
3571 .downcast_ref::<Int32Array>()
3572 .unwrap();
3573 assert_eq!(id.value(0), 7);
3574 }
3575
3576 #[test]
3577 fn test_skip_writer_middle_field_string() {
3578 let mut dec = make_record_resolved_decoder(
3579 &[
3580 ("id", DataType::Int32, false),
3581 ("score", DataType::Int64, false),
3582 ],
3583 vec![Some(0), None, Some(1)],
3584 vec![None, Some(Skipper::String), None],
3585 );
3586 let mut data = Vec::new();
3587 data.extend_from_slice(&encode_avro_int(42));
3588 data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
3589 data.extend_from_slice(&encode_avro_long(1000));
3590 let mut cur = AvroCursor::new(&data);
3591 dec.decode(&mut cur).unwrap();
3592 assert_eq!(cur.position(), data.len());
3593 let arr = dec.flush(None).unwrap();
3594 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3595 let id = s
3596 .column_by_name("id")
3597 .unwrap()
3598 .as_any()
3599 .downcast_ref::<Int32Array>()
3600 .unwrap();
3601 let score = s
3602 .column_by_name("score")
3603 .unwrap()
3604 .as_any()
3605 .downcast_ref::<Int64Array>()
3606 .unwrap();
3607 assert_eq!(id.value(0), 42);
3608 assert_eq!(score.value(0), 1000);
3609 }
3610
3611 #[test]
3612 fn test_skip_writer_array_with_negative_block_count_fast() {
3613 let mut dec = make_record_resolved_decoder(
3614 &[("id", DataType::Int32, false)],
3615 vec![None, Some(0)],
3616 vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
3617 );
3618 let mut array_payload = Vec::new();
3619 array_payload.extend_from_slice(&encode_avro_int(1));
3620 array_payload.extend_from_slice(&encode_avro_int(2));
3621 array_payload.extend_from_slice(&encode_avro_int(3));
3622 let mut data = Vec::new();
3623 data.extend_from_slice(&encode_avro_long(-3));
3624 data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
3625 data.extend_from_slice(&array_payload);
3626 data.extend_from_slice(&encode_avro_long(0));
3627 data.extend_from_slice(&encode_avro_int(5));
3628 let mut cur = AvroCursor::new(&data);
3629 dec.decode(&mut cur).unwrap();
3630 assert_eq!(cur.position(), data.len());
3631 let arr = dec.flush(None).unwrap();
3632 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3633 let id = s
3634 .column_by_name("id")
3635 .unwrap()
3636 .as_any()
3637 .downcast_ref::<Int32Array>()
3638 .unwrap();
3639 assert_eq!(id.len(), 1);
3640 assert_eq!(id.value(0), 5);
3641 }
3642
3643 #[test]
3644 fn test_skip_writer_map_with_negative_block_count_fast() {
3645 let mut dec = make_record_resolved_decoder(
3646 &[("id", DataType::Int32, false)],
3647 vec![None, Some(0)],
3648 vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
3649 );
3650 let mut entries = Vec::new();
3651 entries.extend_from_slice(&encode_avro_bytes(b"k1"));
3652 entries.extend_from_slice(&encode_avro_int(10));
3653 entries.extend_from_slice(&encode_avro_bytes(b"k2"));
3654 entries.extend_from_slice(&encode_avro_int(20));
3655 let mut data = Vec::new();
3656 data.extend_from_slice(&encode_avro_long(-2));
3657 data.extend_from_slice(&encode_avro_long(entries.len() as i64));
3658 data.extend_from_slice(&entries);
3659 data.extend_from_slice(&encode_avro_long(0));
3660 data.extend_from_slice(&encode_avro_int(123));
3661 let mut cur = AvroCursor::new(&data);
3662 dec.decode(&mut cur).unwrap();
3663 assert_eq!(cur.position(), data.len());
3664 let arr = dec.flush(None).unwrap();
3665 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3666 let id = s
3667 .column_by_name("id")
3668 .unwrap()
3669 .as_any()
3670 .downcast_ref::<Int32Array>()
3671 .unwrap();
3672 assert_eq!(id.len(), 1);
3673 assert_eq!(id.value(0), 123);
3674 }
3675
3676 #[test]
3677 fn test_skip_writer_nullable_field_union_nullfirst() {
3678 let mut dec = make_record_resolved_decoder(
3679 &[("id", DataType::Int32, false)],
3680 vec![None, Some(0)],
3681 vec![
3682 Some(super::Skipper::Nullable(
3683 Nullability::NullFirst,
3684 Box::new(super::Skipper::Int32),
3685 )),
3686 None,
3687 ],
3688 );
3689 let mut row1 = Vec::new();
3690 row1.extend_from_slice(&encode_avro_long(0));
3691 row1.extend_from_slice(&encode_avro_int(5));
3692 let mut row2 = Vec::new();
3693 row2.extend_from_slice(&encode_avro_long(1));
3694 row2.extend_from_slice(&encode_avro_int(123));
3695 row2.extend_from_slice(&encode_avro_int(7));
3696 let mut cur1 = AvroCursor::new(&row1);
3697 let mut cur2 = AvroCursor::new(&row2);
3698 dec.decode(&mut cur1).unwrap();
3699 dec.decode(&mut cur2).unwrap();
3700 assert_eq!(cur1.position(), row1.len());
3701 assert_eq!(cur2.position(), row2.len());
3702 let arr = dec.flush(None).unwrap();
3703 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3704 let id = s
3705 .column_by_name("id")
3706 .unwrap()
3707 .as_any()
3708 .downcast_ref::<Int32Array>()
3709 .unwrap();
3710 assert_eq!(id.len(), 2);
3711 assert_eq!(id.value(0), 5);
3712 assert_eq!(id.value(1), 7);
3713 }
3714
3715 fn make_dense_union_avro(
3716 children: Vec<(Codec, &'_ str, DataType)>,
3717 type_ids: Vec<i8>,
3718 ) -> AvroDataType {
3719 let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
3720 let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
3721 for (codec, name, dt) in children.into_iter() {
3722 avro_children.push(AvroDataType::new(codec, Default::default(), None));
3723 fields.push(arrow_schema::Field::new(name, dt, true));
3724 }
3725 let union_fields = UnionFields::try_new(type_ids, fields).unwrap();
3726 let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
3727 AvroDataType::new(union_codec, Default::default(), None)
3728 }
3729
3730 #[test]
3731 fn test_union_dense_two_children_custom_type_ids() {
3732 let union_dt = make_dense_union_avro(
3733 vec![
3734 (Codec::Int32, "i", DataType::Int32),
3735 (Codec::Utf8, "s", DataType::Utf8),
3736 ],
3737 vec![2, 5],
3738 );
3739 let mut dec = Decoder::try_new(&union_dt).unwrap();
3740 let mut r1 = Vec::new();
3741 r1.extend_from_slice(&encode_avro_long(0));
3742 r1.extend_from_slice(&encode_avro_int(7));
3743 let mut r2 = Vec::new();
3744 r2.extend_from_slice(&encode_avro_long(1));
3745 r2.extend_from_slice(&encode_avro_bytes(b"x"));
3746 let mut r3 = Vec::new();
3747 r3.extend_from_slice(&encode_avro_long(0));
3748 r3.extend_from_slice(&encode_avro_int(-1));
3749 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3750 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3751 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3752 let array = dec.flush(None).unwrap();
3753 let ua = array
3754 .as_any()
3755 .downcast_ref::<UnionArray>()
3756 .expect("expected UnionArray");
3757 assert_eq!(ua.len(), 3);
3758 assert_eq!(ua.type_id(0), 2);
3759 assert_eq!(ua.type_id(1), 5);
3760 assert_eq!(ua.type_id(2), 2);
3761 assert_eq!(ua.value_offset(0), 0);
3762 assert_eq!(ua.value_offset(1), 0);
3763 assert_eq!(ua.value_offset(2), 1);
3764 let int_child = ua
3765 .child(2)
3766 .as_any()
3767 .downcast_ref::<Int32Array>()
3768 .expect("int child");
3769 assert_eq!(int_child.len(), 2);
3770 assert_eq!(int_child.value(0), 7);
3771 assert_eq!(int_child.value(1), -1);
3772 let str_child = ua
3773 .child(5)
3774 .as_any()
3775 .downcast_ref::<StringArray>()
3776 .expect("string child");
3777 assert_eq!(str_child.len(), 1);
3778 assert_eq!(str_child.value(0), "x");
3779 }
3780
3781 #[test]
3782 fn test_union_dense_with_null_and_string_children() {
3783 let union_dt = make_dense_union_avro(
3784 vec![
3785 (Codec::Null, "n", DataType::Null),
3786 (Codec::Utf8, "s", DataType::Utf8),
3787 ],
3788 vec![42, 7],
3789 );
3790 let mut dec = Decoder::try_new(&union_dt).unwrap();
3791 let r1 = encode_avro_long(0);
3792 let mut r2 = Vec::new();
3793 r2.extend_from_slice(&encode_avro_long(1));
3794 r2.extend_from_slice(&encode_avro_bytes(b"abc"));
3795 let r3 = encode_avro_long(0);
3796 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3797 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3798 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3799 let array = dec.flush(None).unwrap();
3800 let ua = array
3801 .as_any()
3802 .downcast_ref::<UnionArray>()
3803 .expect("expected UnionArray");
3804 assert_eq!(ua.len(), 3);
3805 assert_eq!(ua.type_id(0), 42);
3806 assert_eq!(ua.type_id(1), 7);
3807 assert_eq!(ua.type_id(2), 42);
3808 assert_eq!(ua.value_offset(0), 0);
3809 assert_eq!(ua.value_offset(1), 0);
3810 assert_eq!(ua.value_offset(2), 1);
3811 let null_child = ua
3812 .child(42)
3813 .as_any()
3814 .downcast_ref::<NullArray>()
3815 .expect("null child");
3816 assert_eq!(null_child.len(), 2);
3817 let str_child = ua
3818 .child(7)
3819 .as_any()
3820 .downcast_ref::<StringArray>()
3821 .expect("string child");
3822 assert_eq!(str_child.len(), 1);
3823 assert_eq!(str_child.value(0), "abc");
3824 }
3825
3826 #[test]
3827 fn test_union_decode_negative_branch_index_errors() {
3828 let union_dt = make_dense_union_avro(
3829 vec![
3830 (Codec::Int32, "i", DataType::Int32),
3831 (Codec::Utf8, "s", DataType::Utf8),
3832 ],
3833 vec![0, 1],
3834 );
3835 let mut dec = Decoder::try_new(&union_dt).unwrap();
3836 let row = encode_avro_long(-1); let err = dec
3838 .decode(&mut AvroCursor::new(&row))
3839 .expect_err("expected error for negative branch index");
3840 let msg = err.to_string();
3841 assert!(
3842 msg.contains("Negative union branch index"),
3843 "unexpected error message: {msg}"
3844 );
3845 }
3846
3847 #[test]
3848 fn test_union_decode_out_of_range_branch_index_errors() {
3849 let union_dt = make_dense_union_avro(
3850 vec![
3851 (Codec::Int32, "i", DataType::Int32),
3852 (Codec::Utf8, "s", DataType::Utf8),
3853 ],
3854 vec![10, 11],
3855 );
3856 let mut dec = Decoder::try_new(&union_dt).unwrap();
3857 let row = encode_avro_long(2);
3858 let err = dec
3859 .decode(&mut AvroCursor::new(&row))
3860 .expect_err("expected error for out-of-range branch index");
3861 let msg = err.to_string();
3862 assert!(
3863 msg.contains("out of range"),
3864 "unexpected error message: {msg}"
3865 );
3866 }
3867
3868 #[test]
3869 fn test_union_sparse_mode_not_supported() {
3870 let children: Vec<AvroDataType> = vec![
3871 AvroDataType::new(Codec::Int32, Default::default(), None),
3872 AvroDataType::new(Codec::Utf8, Default::default(), None),
3873 ];
3874 let uf = UnionFields::try_new(
3875 vec![1, 3],
3876 vec![
3877 arrow_schema::Field::new("i", DataType::Int32, true),
3878 arrow_schema::Field::new("s", DataType::Utf8, true),
3879 ],
3880 )
3881 .unwrap();
3882 let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
3883 let dt = AvroDataType::new(codec, Default::default(), None);
3884 let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
3885 let msg = err.to_string();
3886 assert!(
3887 msg.contains("Sparse Arrow unions are not yet supported"),
3888 "unexpected error message: {msg}"
3889 );
3890 }
3891
3892 fn make_record_decoder_with_projector_defaults(
3893 reader_fields: &[(&str, DataType, bool)],
3894 field_defaults: Vec<Option<AvroLiteral>>,
3895 default_injections: Vec<(usize, AvroLiteral)>,
3896 writer_to_reader_len: usize,
3897 ) -> Decoder {
3898 assert_eq!(
3899 field_defaults.len(),
3900 reader_fields.len(),
3901 "field_defaults must have one entry per reader field"
3902 );
3903 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3904 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3905 for (name, dt, nullable) in reader_fields {
3906 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3907 let enc = match dt {
3908 DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
3909 DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
3910 DataType::Utf8 => Decoder::String(
3911 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3912 Vec::with_capacity(DEFAULT_CAPACITY),
3913 ),
3914 other => panic!("Unsupported test field type in helper: {other:?}"),
3915 };
3916 encodings.push(enc);
3917 }
3918 let fields: Fields = field_refs.into();
3919 let skip_decoders: Vec<Option<Skipper>> =
3920 (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
3921 let projector = Projector {
3922 writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
3923 skip_decoders,
3924 field_defaults,
3925 default_injections: Arc::from(default_injections),
3926 };
3927 Decoder::Record(fields, encodings, Some(projector))
3928 }
3929
3930 #[test]
3931 fn test_default_append_int32_and_int64_from_int_and_long() {
3932 let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3933 d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
3934 let arr = d_i32.flush(None).unwrap();
3935 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3936 assert_eq!(a.len(), 1);
3937 assert_eq!(a.value(0), 42);
3938 let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
3939 d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
3940 d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
3941 let arr64 = d_i64.flush(None).unwrap();
3942 let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
3943 assert_eq!(a64.len(), 2);
3944 assert_eq!(a64.value(0), 5);
3945 assert_eq!(a64.value(1), 7);
3946 }
3947
3948 #[test]
3949 fn test_default_append_floats_and_doubles() {
3950 let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
3951 d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
3952 let arr32 = d_f32.flush(None).unwrap();
3953 let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
3954 assert_eq!(a.value(0), 1.5);
3955 let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
3956 d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
3957 let arr64 = d_f64.flush(None).unwrap();
3958 let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
3959 assert_eq!(b.value(0), 2.25);
3960 }
3961
3962 #[test]
3963 fn test_default_append_string_and_bytes() {
3964 let mut d_str = Decoder::String(
3965 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3966 Vec::with_capacity(DEFAULT_CAPACITY),
3967 );
3968 d_str
3969 .append_default(&AvroLiteral::String("hi".into()))
3970 .unwrap();
3971 let s_arr = d_str.flush(None).unwrap();
3972 let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
3973 assert_eq!(arr.value(0), "hi");
3974 let mut d_bytes = Decoder::Binary(
3975 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3976 Vec::with_capacity(DEFAULT_CAPACITY),
3977 );
3978 d_bytes
3979 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3980 .unwrap();
3981 let b_arr = d_bytes.flush(None).unwrap();
3982 let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3983 assert_eq!(barr.value(0), &[1, 2, 3]);
3984 let mut d_str_err = Decoder::String(
3985 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3986 Vec::with_capacity(DEFAULT_CAPACITY),
3987 );
3988 let err = d_str_err
3989 .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
3990 .unwrap_err();
3991 assert!(
3992 err.to_string()
3993 .contains("Default for string must be string"),
3994 "unexpected error: {err:?}"
3995 );
3996 }
3997
3998 #[test]
3999 fn test_default_append_nullable_int32_null_and_value() {
4000 let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
4001 let mut dec = Decoder::Nullable(
4002 Nullability::NullFirst,
4003 NullBufferBuilder::new(DEFAULT_CAPACITY),
4004 Box::new(inner),
4005 NullablePlan::ReadTag,
4006 );
4007 dec.append_default(&AvroLiteral::Null).unwrap();
4008 dec.append_default(&AvroLiteral::Int(11)).unwrap();
4009 let arr = dec.flush(None).unwrap();
4010 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
4011 assert_eq!(a.len(), 2);
4012 assert!(a.is_null(0));
4013 assert_eq!(a.value(1), 11);
4014 }
4015
4016 #[test]
4017 fn test_default_append_array_of_ints() {
4018 let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
4019 let mut d = Decoder::try_new(&list_dt).unwrap();
4020 let items = vec![
4021 AvroLiteral::Int(1),
4022 AvroLiteral::Int(2),
4023 AvroLiteral::Int(3),
4024 ];
4025 d.append_default(&AvroLiteral::Array(items)).unwrap();
4026 let arr = d.flush(None).unwrap();
4027 let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
4028 assert_eq!(list.len(), 1);
4029 assert_eq!(list.value_length(0), 3);
4030 let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
4031 assert_eq!(vals.values(), &[1, 2, 3]);
4032 }
4033
4034 #[test]
4035 fn test_default_append_map_string_to_int() {
4036 let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
4037 let mut d = Decoder::try_new(&map_dt).unwrap();
4038 let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
4039 m.insert("k1".to_string(), AvroLiteral::Int(10));
4040 m.insert("k2".to_string(), AvroLiteral::Int(20));
4041 d.append_default(&AvroLiteral::Map(m)).unwrap();
4042 let arr = d.flush(None).unwrap();
4043 let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
4044 assert_eq!(map.len(), 1);
4045 assert_eq!(map.value_length(0), 2);
4046 let binding = map.value(0);
4047 let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
4048 let k = entries
4049 .column_by_name("key")
4050 .unwrap()
4051 .as_any()
4052 .downcast_ref::<StringArray>()
4053 .unwrap();
4054 let v = entries
4055 .column_by_name("value")
4056 .unwrap()
4057 .as_any()
4058 .downcast_ref::<Int32Array>()
4059 .unwrap();
4060 let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4061 assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4062 let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4063 assert_eq!(vals, [10, 20].into_iter().collect());
4064 }
4065
4066 #[test]
4067 fn test_default_append_enum_by_symbol() {
4068 let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4069 let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4070 d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4071 let arr = d.flush(None).unwrap();
4072 let dict = arr
4073 .as_any()
4074 .downcast_ref::<DictionaryArray<Int32Type>>()
4075 .unwrap();
4076 assert_eq!(dict.len(), 1);
4077 let expected = Int32Array::from(vec![1]);
4078 assert_eq!(dict.keys(), &expected);
4079 let values = dict
4080 .values()
4081 .as_any()
4082 .downcast_ref::<StringArray>()
4083 .unwrap();
4084 assert_eq!(values.value(1), "B");
4085 }
4086
4087 #[test]
4088 fn test_default_append_uuid_and_type_error() {
4089 let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4090 let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4091 d.append_default(&AvroLiteral::String(uuid_str.into()))
4092 .unwrap();
4093 let arr_ref = d.flush(None).unwrap();
4094 let arr = arr_ref
4095 .as_any()
4096 .downcast_ref::<FixedSizeBinaryArray>()
4097 .unwrap();
4098 assert_eq!(arr.value_length(), 16);
4099 assert_eq!(arr.len(), 1);
4100 let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4101 let err = d2
4102 .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4103 .unwrap_err();
4104 assert!(
4105 err.to_string().contains("Default for uuid must be string"),
4106 "unexpected error: {err:?}"
4107 );
4108 }
4109
4110 #[test]
4111 fn test_default_append_fixed_and_length_mismatch() {
4112 let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4113 d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4114 .unwrap();
4115 let arr_ref = d.flush(None).unwrap();
4116 let arr = arr_ref
4117 .as_any()
4118 .downcast_ref::<FixedSizeBinaryArray>()
4119 .unwrap();
4120 assert_eq!(arr.value_length(), 4);
4121 assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4122 let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4123 let err = d_err
4124 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4125 .unwrap_err();
4126 assert!(
4127 err.to_string().contains("Fixed default length"),
4128 "unexpected error: {err:?}"
4129 );
4130 }
4131
4132 #[test]
4133 fn test_default_append_duration_and_length_validation() {
4134 let dt = avro_from_codec(Codec::Interval);
4135 let mut d = Decoder::try_new(&dt).unwrap();
4136 let mut bytes = Vec::with_capacity(12);
4137 bytes.extend_from_slice(&1u32.to_le_bytes());
4138 bytes.extend_from_slice(&2u32.to_le_bytes());
4139 bytes.extend_from_slice(&3u32.to_le_bytes());
4140 d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4141 let arr_ref = d.flush(None).unwrap();
4142 let arr = arr_ref
4143 .as_any()
4144 .downcast_ref::<IntervalMonthDayNanoArray>()
4145 .unwrap();
4146 assert_eq!(arr.len(), 1);
4147 let v = arr.value(0);
4148 assert_eq!(v.months, 1);
4149 assert_eq!(v.days, 2);
4150 assert_eq!(v.nanoseconds, 3_000_000);
4151 let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4152 let err = d_err
4153 .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4154 .unwrap_err();
4155 assert!(
4156 err.to_string()
4157 .contains("Duration default must be exactly 12 bytes"),
4158 "unexpected error: {err:?}"
4159 );
4160 }
4161
4162 #[test]
4163 fn test_default_append_decimal256_from_bytes() {
4164 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4165 let mut d = Decoder::try_new(&dt).unwrap();
4166 let pos: [u8; 32] = [
4167 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4168 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4169 0x00, 0x00, 0x30, 0x39,
4170 ];
4171 d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4172 let neg: [u8; 32] = [
4173 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4174 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4175 0xFF, 0xFF, 0xFF, 0x85,
4176 ];
4177 d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4178 let arr = d.flush(None).unwrap();
4179 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4180 assert_eq!(dec.len(), 2);
4181 assert_eq!(dec.value_as_string(0), "123.45");
4182 assert_eq!(dec.value_as_string(1), "-1.23");
4183 }
4184
4185 #[test]
4186 fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4187 let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4188 let mut rec = make_record_decoder_with_projector_defaults(
4189 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4190 field_defaults,
4191 vec![],
4192 0,
4193 );
4194 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4195 map.insert("a".to_string(), AvroLiteral::Int(7));
4196 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4197 let arr = rec.flush(None).unwrap();
4198 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4199 let a = s
4200 .column_by_name("a")
4201 .unwrap()
4202 .as_any()
4203 .downcast_ref::<Int32Array>()
4204 .unwrap();
4205 let b = s
4206 .column_by_name("b")
4207 .unwrap()
4208 .as_any()
4209 .downcast_ref::<StringArray>()
4210 .unwrap();
4211 assert_eq!(a.value(0), 7);
4212 assert_eq!(b.value(0), "hi");
4213 }
4214
4215 #[test]
4216 fn test_record_append_default_null_uses_projector_field_defaults() {
4217 let field_defaults = vec![
4218 Some(AvroLiteral::Int(5)),
4219 Some(AvroLiteral::String("x".into())),
4220 ];
4221 let mut rec = make_record_decoder_with_projector_defaults(
4222 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4223 field_defaults,
4224 vec![],
4225 0,
4226 );
4227 rec.append_default(&AvroLiteral::Null).unwrap();
4228 let arr = rec.flush(None).unwrap();
4229 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4230 let a = s
4231 .column_by_name("a")
4232 .unwrap()
4233 .as_any()
4234 .downcast_ref::<Int32Array>()
4235 .unwrap();
4236 let b = s
4237 .column_by_name("b")
4238 .unwrap()
4239 .as_any()
4240 .downcast_ref::<StringArray>()
4241 .unwrap();
4242 assert_eq!(a.value(0), 5);
4243 assert_eq!(b.value(0), "x");
4244 }
4245
4246 #[test]
4247 fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
4248 {
4249 let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
4250 let mut field_refs: Vec<FieldRef> = Vec::new();
4251 let mut encoders: Vec<Decoder> = Vec::new();
4252 for (name, dt, nullable) in &fields {
4253 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4254 }
4255 let enc_a = Decoder::Nullable(
4256 Nullability::NullSecond,
4257 NullBufferBuilder::new(DEFAULT_CAPACITY),
4258 Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
4259 NullablePlan::ReadTag,
4260 );
4261 let enc_b = Decoder::Nullable(
4262 Nullability::NullSecond,
4263 NullBufferBuilder::new(DEFAULT_CAPACITY),
4264 Box::new(Decoder::String(
4265 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4266 Vec::with_capacity(DEFAULT_CAPACITY),
4267 )),
4268 NullablePlan::ReadTag,
4269 );
4270 encoders.push(enc_a);
4271 encoders.push(enc_b);
4272 let projector = Projector {
4273 writer_to_reader: Arc::from(vec![]),
4274 skip_decoders: vec![],
4275 field_defaults: vec![None, None], default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4277 };
4278 let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
4279 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4280 map.insert("a".to_string(), AvroLiteral::Int(9));
4281 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4282 let arr = rec.flush(None).unwrap();
4283 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4284 let a = s
4285 .column_by_name("a")
4286 .unwrap()
4287 .as_any()
4288 .downcast_ref::<Int32Array>()
4289 .unwrap();
4290 let b = s
4291 .column_by_name("b")
4292 .unwrap()
4293 .as_any()
4294 .downcast_ref::<StringArray>()
4295 .unwrap();
4296 assert!(a.is_valid(0));
4297 assert_eq!(a.value(0), 9);
4298 assert!(b.is_null(0));
4299 }
4300
4301 #[test]
4302 fn test_projector_default_injection_when_writer_lacks_fields() {
4303 let defaults = vec![None, None];
4304 let injections = vec![
4305 (0, AvroLiteral::Int(99)),
4306 (1, AvroLiteral::String("alice".into())),
4307 ];
4308 let mut rec = make_record_decoder_with_projector_defaults(
4309 &[
4310 ("id", DataType::Int32, false),
4311 ("name", DataType::Utf8, false),
4312 ],
4313 defaults,
4314 injections,
4315 0,
4316 );
4317 rec.decode(&mut AvroCursor::new(&[])).unwrap();
4318 let arr = rec.flush(None).unwrap();
4319 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4320 let id = s
4321 .column_by_name("id")
4322 .unwrap()
4323 .as_any()
4324 .downcast_ref::<Int32Array>()
4325 .unwrap();
4326 let name = s
4327 .column_by_name("name")
4328 .unwrap()
4329 .as_any()
4330 .downcast_ref::<StringArray>()
4331 .unwrap();
4332 assert_eq!(id.value(0), 99);
4333 assert_eq!(name.value(0), "alice");
4334 }
4335
4336 #[test]
4337 fn union_type_ids_are_not_child_indexes() {
4338 let encodings: Vec<AvroDataType> =
4339 vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4340 let fields: UnionFields = [
4341 (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4342 (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4343 ]
4344 .into_iter()
4345 .collect();
4346 let dt = avro_from_codec(Codec::Union(
4347 encodings.into(),
4348 fields.clone(),
4349 UnionMode::Dense,
4350 ));
4351 let mut dec = Decoder::try_new(&dt).expect("decoder");
4352 let mut b1 = encode_avro_long(1);
4353 b1.extend(encode_avro_bytes("hi".as_bytes()));
4354 dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4355 let mut b0 = encode_avro_long(0);
4356 b0.extend(encode_avro_int(5));
4357 dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4358 let arr = dec.flush(None).expect("flush");
4359 let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4360 assert_eq!(ua.len(), 2);
4361 assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4362 assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4363 assert_eq!(ua.value_offset(0), 0);
4364 assert_eq!(ua.value_offset(1), 0);
4365 let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4366 assert_eq!(utf8_child.len(), 1);
4367 assert_eq!(utf8_child.value(0), "hi");
4368 let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
4369 assert_eq!(int_child.len(), 1);
4370 assert_eq!(int_child.value(0), 5);
4371 let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
4372 assert_eq!(type_ids, vec![42_i8, 7_i8]);
4373 }
4374
4375 #[cfg(feature = "avro_custom_types")]
4376 #[test]
4377 fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), AvroError> {
4378 for codec in [
4379 Codec::DurationNanos,
4380 Codec::DurationMicros,
4381 Codec::DurationMillis,
4382 Codec::DurationSeconds,
4383 ] {
4384 let dt = make_avro_dt(codec.clone(), None);
4385 let s = Skipper::from_avro(&dt)?;
4386 match s {
4387 Skipper::Int64 => {}
4388 other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
4389 }
4390 }
4391 Ok(())
4392 }
4393
4394 #[cfg(feature = "avro_custom_types")]
4395 #[test]
4396 fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), AvroError> {
4397 let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
4398 for codec in [
4399 Codec::DurationNanos,
4400 Codec::DurationMicros,
4401 Codec::DurationMillis,
4402 Codec::DurationSeconds,
4403 ] {
4404 let dt = make_avro_dt(codec.clone(), None);
4405 let mut s = Skipper::from_avro(&dt)?;
4406 for &v in &values {
4407 let bytes = encode_avro_long(v);
4408 let mut cursor = AvroCursor::new(&bytes);
4409 s.skip(&mut cursor)?;
4410 assert_eq!(
4411 cursor.position(),
4412 bytes.len(),
4413 "did not consume all bytes for {:?} value {}",
4414 codec,
4415 v
4416 );
4417 }
4418 }
4419 Ok(())
4420 }
4421
4422 #[cfg(feature = "avro_custom_types")]
4423 #[test]
4424 fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), AvroError> {
4425 let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
4426 let mut s = Skipper::from_avro(&dt)?;
4427 match &s {
4428 Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
4429 Skipper::Int64 => {}
4430 ref other => panic!("expected inner Int64, got {:?}", other),
4431 },
4432 other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
4433 }
4434 {
4435 let buf = encode_vlq_u64(0);
4436 let mut cursor = AvroCursor::new(&buf);
4437 s.skip(&mut cursor)?;
4438 assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
4439 }
4440 {
4441 let mut buf = encode_vlq_u64(1);
4442 buf.extend(encode_avro_long(0));
4443 let mut cursor = AvroCursor::new(&buf);
4444 s.skip(&mut cursor)?;
4445 assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
4446 }
4447
4448 Ok(())
4449 }
4450
4451 #[cfg(feature = "avro_custom_types")]
4452 #[test]
4453 fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), AvroError> {
4454 let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
4455 let mut s = Skipper::from_avro(&dt)?;
4456 match &s {
4457 Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
4458 Skipper::Int64 => {}
4459 ref other => panic!("expected inner Int64, got {:?}", other),
4460 },
4461 other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
4462 }
4463 {
4464 let buf = encode_vlq_u64(1);
4465 let mut cursor = AvroCursor::new(&buf);
4466 s.skip(&mut cursor)?;
4467 assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
4468 }
4469 {
4470 let mut buf = encode_vlq_u64(0);
4471 buf.extend(encode_avro_long(-1));
4472 let mut cursor = AvroCursor::new(&buf);
4473 s.skip(&mut cursor)?;
4474 assert_eq!(
4475 cursor.position(),
4476 1 + encode_avro_long(-1).len(),
4477 "expected to consume tag=0 + long(-1)"
4478 );
4479 }
4480 Ok(())
4481 }
4482
4483 #[test]
4484 fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), AvroError> {
4485 let dt = make_avro_dt(Codec::Interval, None);
4486 let mut s = Skipper::from_avro(&dt)?;
4487 match s {
4488 Skipper::DurationFixed12 => {}
4489 other => panic!("expected DurationFixed12, got {:?}", other),
4490 }
4491 let payload = vec![0u8; 12];
4492 let mut cursor = AvroCursor::new(&payload);
4493 s.skip(&mut cursor)?;
4494 assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
4495 Ok(())
4496 }
4497
4498 #[cfg(feature = "avro_custom_types")]
4499 #[test]
4500 fn test_run_end_encoded_width16_int32_basic_grouping() {
4501 use arrow_array::RunArray;
4502 use std::sync::Arc;
4503 let inner = avro_from_codec(Codec::Int32);
4504 let ree = AvroDataType::new(
4505 Codec::RunEndEncoded(Arc::new(inner), 16),
4506 Default::default(),
4507 None,
4508 );
4509 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4510 for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
4511 let bytes = encode_avro_int(v);
4512 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4513 }
4514 let arr = dec.flush(None).expect("flush");
4515 let ra = arr
4516 .as_any()
4517 .downcast_ref::<RunArray<Int16Type>>()
4518 .expect("RunArray<Int16Type>");
4519 assert_eq!(ra.len(), 9);
4520 assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
4521 let vals = ra
4522 .values()
4523 .as_ref()
4524 .as_any()
4525 .downcast_ref::<Int32Array>()
4526 .expect("values Int32");
4527 assert_eq!(vals.values(), &[1, 2, 3]);
4528 }
4529
4530 #[cfg(feature = "avro_custom_types")]
4531 #[test]
4532 fn test_run_end_encoded_width32_nullable_values_group_nulls() {
4533 use arrow_array::RunArray;
4534 use std::sync::Arc;
4535 let inner = AvroDataType::new(
4536 Codec::Int32,
4537 Default::default(),
4538 Some(Nullability::NullSecond),
4539 );
4540 let ree = AvroDataType::new(
4541 Codec::RunEndEncoded(Arc::new(inner), 32),
4542 Default::default(),
4543 None,
4544 );
4545 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4546 let seq: [Option<i32>; 8] = [
4547 None,
4548 None,
4549 Some(7),
4550 Some(7),
4551 Some(7),
4552 None,
4553 Some(5),
4554 Some(5),
4555 ];
4556 for item in seq {
4557 let mut bytes = Vec::new();
4558 match item {
4559 None => bytes.extend_from_slice(&encode_vlq_u64(1)),
4560 Some(v) => {
4561 bytes.extend_from_slice(&encode_vlq_u64(0));
4562 bytes.extend_from_slice(&encode_avro_int(v));
4563 }
4564 }
4565 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4566 }
4567 let arr = dec.flush(None).expect("flush");
4568 let ra = arr
4569 .as_any()
4570 .downcast_ref::<RunArray<Int32Type>>()
4571 .expect("RunArray<Int32Type>");
4572 assert_eq!(ra.len(), 8);
4573 assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
4574 let vals = ra
4575 .values()
4576 .as_ref()
4577 .as_any()
4578 .downcast_ref::<Int32Array>()
4579 .expect("values Int32 (nullable)");
4580 assert_eq!(vals.len(), 4);
4581 assert!(vals.is_null(0));
4582 assert_eq!(vals.value(1), 7);
4583 assert!(vals.is_null(2));
4584 assert_eq!(vals.value(3), 5);
4585 }
4586
4587 #[cfg(feature = "avro_custom_types")]
4588 #[test]
4589 fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
4590 use arrow_array::RunArray;
4591 let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4592 let ree = Decoder::RunEndEncoded(
4593 8, 0,
4595 Box::new(inner_values),
4596 );
4597 let mut dec = Decoder::Nullable(
4598 Nullability::NullSecond,
4599 NullBufferBuilder::new(DEFAULT_CAPACITY),
4600 Box::new(ree),
4601 NullablePlan::FromSingle {
4602 promotion: Promotion::IntToDouble,
4603 },
4604 );
4605 for v in [1, 1, 2, 2, 2] {
4606 let bytes = encode_avro_int(v);
4607 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4608 }
4609 let arr = dec.flush(None).expect("flush");
4610 let ra = arr
4611 .as_any()
4612 .downcast_ref::<RunArray<Int64Type>>()
4613 .expect("RunArray<Int64Type>");
4614 assert_eq!(ra.len(), 5);
4615 assert_eq!(ra.run_ends().values(), &[2, 5]);
4616 let vals = ra
4617 .values()
4618 .as_ref()
4619 .as_any()
4620 .downcast_ref::<Float64Array>()
4621 .expect("values Float64");
4622 assert_eq!(vals.values(), &[1.0, 2.0]);
4623 }
4624
4625 #[cfg(feature = "avro_custom_types")]
4626 #[test]
4627 fn test_run_end_encoded_unsupported_run_end_width_errors() {
4628 use std::sync::Arc;
4629 let inner = avro_from_codec(Codec::Int32);
4630 let dt = AvroDataType::new(
4631 Codec::RunEndEncoded(Arc::new(inner), 3),
4632 Default::default(),
4633 None,
4634 );
4635 let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
4636 let msg = err.to_string();
4637 assert!(
4638 msg.contains("Unsupported run-end width")
4639 && msg.contains("16/32/64 bits or 2/4/8 bytes"),
4640 "unexpected error message: {msg}"
4641 );
4642 }
4643
4644 #[cfg(feature = "avro_custom_types")]
4645 #[test]
4646 fn test_run_end_encoded_empty_input_is_empty_runarray() {
4647 use arrow_array::RunArray;
4648 use std::sync::Arc;
4649 let inner = avro_from_codec(Codec::Utf8);
4650 let dt = AvroDataType::new(
4651 Codec::RunEndEncoded(Arc::new(inner), 4),
4652 Default::default(),
4653 None,
4654 );
4655 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4656 let arr = dec.flush(None).expect("flush");
4657 let ra = arr
4658 .as_any()
4659 .downcast_ref::<RunArray<Int32Type>>()
4660 .expect("RunArray<Int32Type>");
4661 assert_eq!(ra.len(), 0);
4662 assert_eq!(ra.run_ends().len(), 0);
4663 assert_eq!(ra.values().len(), 0);
4664 }
4665
4666 #[cfg(feature = "avro_custom_types")]
4667 #[test]
4668 fn test_run_end_encoded_strings_grouping_width32_bits() {
4669 use arrow_array::RunArray;
4670 use std::sync::Arc;
4671 let inner = avro_from_codec(Codec::Utf8);
4672 let dt = AvroDataType::new(
4673 Codec::RunEndEncoded(Arc::new(inner), 32),
4674 Default::default(),
4675 None,
4676 );
4677 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4678 for s in ["a", "a", "bb", "bb", "bb", "a"] {
4679 let bytes = encode_avro_bytes(s.as_bytes());
4680 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4681 }
4682 let arr = dec.flush(None).expect("flush");
4683 let ra = arr
4684 .as_any()
4685 .downcast_ref::<RunArray<Int32Type>>()
4686 .expect("RunArray<Int32Type>");
4687 assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
4688 let vals = ra
4689 .values()
4690 .as_ref()
4691 .as_any()
4692 .downcast_ref::<StringArray>()
4693 .expect("values String");
4694 assert_eq!(vals.len(), 3);
4695 assert_eq!(vals.value(0), "a");
4696 assert_eq!(vals.value(1), "bb");
4697 assert_eq!(vals.value(2), "a");
4698 }
4699
4700 #[cfg(not(feature = "avro_custom_types"))]
4701 #[test]
4702 fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
4703 let dt = avro_from_codec(Codec::Int32);
4704 let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
4705 for v in [1, 2, 3] {
4706 let bytes = encode_avro_int(v);
4707 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4708 }
4709 let arr = dec.flush(None).expect("flush");
4710 let a = arr
4711 .as_any()
4712 .downcast_ref::<Int32Array>()
4713 .expect("Int32Array");
4714 assert_eq!(a.values(), &[1, 2, 3]);
4715 }
4716
4717 #[test]
4718 fn test_timestamp_nanos_decoding_utc() {
4719 let avro_type = avro_from_codec(Codec::TimestampNanos(true));
4720 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4721 let mut data = Vec::new();
4722 for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
4723 data.extend_from_slice(&encode_avro_long(v));
4724 }
4725 let mut cur = AvroCursor::new(&data);
4726 for _ in 0..4 {
4727 decoder.decode(&mut cur).expect("decode nanos ts");
4728 }
4729 let array = decoder.flush(None).expect("flush nanos ts");
4730 let ts = array
4731 .as_any()
4732 .downcast_ref::<TimestampNanosecondArray>()
4733 .expect("TimestampNanosecondArray");
4734 assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
4735 match ts.data_type() {
4736 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4737 assert_eq!(tz.as_deref(), Some("+00:00"));
4738 }
4739 other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
4740 }
4741 }
4742
4743 #[test]
4744 fn test_timestamp_nanos_decoding_local() {
4745 let avro_type = avro_from_codec(Codec::TimestampNanos(false));
4746 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4747 let mut data = Vec::new();
4748 for v in [10_i64, 20_i64, -30_i64] {
4749 data.extend_from_slice(&encode_avro_long(v));
4750 }
4751 let mut cur = AvroCursor::new(&data);
4752 for _ in 0..3 {
4753 decoder.decode(&mut cur).expect("decode nanos ts");
4754 }
4755 let array = decoder.flush(None).expect("flush nanos ts");
4756 let ts = array
4757 .as_any()
4758 .downcast_ref::<TimestampNanosecondArray>()
4759 .expect("TimestampNanosecondArray");
4760 assert_eq!(ts.values(), &[10, 20, -30]);
4761 match ts.data_type() {
4762 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4763 assert_eq!(tz.as_deref(), None);
4764 }
4765 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4766 }
4767 }
4768
4769 #[test]
4770 fn test_timestamp_nanos_decoding_with_nulls() {
4771 let avro_type = AvroDataType::new(
4772 Codec::TimestampNanos(false),
4773 Default::default(),
4774 Some(Nullability::NullFirst),
4775 );
4776 let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
4777 let mut data = Vec::new();
4778 data.extend_from_slice(&encode_avro_long(1));
4779 data.extend_from_slice(&encode_avro_long(42));
4780 data.extend_from_slice(&encode_avro_long(0));
4781 data.extend_from_slice(&encode_avro_long(1));
4782 data.extend_from_slice(&encode_avro_long(-7));
4783 let mut cur = AvroCursor::new(&data);
4784 for _ in 0..3 {
4785 decoder.decode(&mut cur).expect("decode nullable nanos ts");
4786 }
4787 let array = decoder.flush(None).expect("flush nullable nanos ts");
4788 let ts = array
4789 .as_any()
4790 .downcast_ref::<TimestampNanosecondArray>()
4791 .expect("TimestampNanosecondArray");
4792 assert_eq!(ts.len(), 3);
4793 assert!(ts.is_valid(0));
4794 assert!(ts.is_null(1));
4795 assert!(ts.is_valid(2));
4796 assert_eq!(ts.value(0), 42);
4797 assert_eq!(ts.value(2), -7);
4798 match ts.data_type() {
4799 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4800 assert_eq!(tz.as_deref(), None);
4801 }
4802 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4803 }
4804 }
4805}