1use crate::codec::{
19 AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
20 ResolvedUnion,
21};
22use crate::reader::cursor::AvroCursor;
23use crate::schema::Nullability;
24use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
25#[cfg(feature = "small_decimals")]
26use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
27use arrow_array::types::*;
28use arrow_array::*;
29use arrow_buffer::*;
30use arrow_schema::{
31 ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as ArrowSchema, SchemaRef,
32 UnionFields, UnionMode, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
33};
34#[cfg(feature = "small_decimals")]
35use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
36use std::cmp::Ordering;
37use std::sync::Arc;
38use strum_macros::AsRefStr;
39use uuid::Uuid;
40
41const DEFAULT_CAPACITY: usize = 1024;
42
43#[derive(Clone, Copy, Debug)]
45enum NullablePlan {
46 ReadTag,
48 FromSingle { promotion: Promotion },
51}
52
53macro_rules! decode_decimal {
55 ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
56 let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
57 $builder.append_value(<$Int>::from_be_bytes(bytes));
58 }};
59}
60
61macro_rules! flush_decimal {
63 ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
64 let (_, vals, _) = $builder.finish().into_parts();
65 let dec = <$ArrayTy>::try_new(vals, $nulls)?
66 .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)
67 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
68 Arc::new(dec) as ArrayRef
69 }};
70}
71
72macro_rules! append_decimal_default {
75 ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
76 match $lit {
77 AvroLiteral::Bytes(b) => {
78 let ext = sign_cast_to::<$N>(b)?;
79 let val = <$Int>::from_be_bytes(ext);
80 $builder.append_value(val);
81 Ok(())
82 }
83 _ => Err(ArrowError::InvalidArgumentError(
84 concat!(
85 "Default for ",
86 $name,
87 " must be bytes (two's-complement big-endian)"
88 )
89 .to_string(),
90 )),
91 }
92 }};
93}
94
95#[derive(Debug)]
96pub(crate) struct RecordDecoderBuilder<'a> {
97 data_type: &'a AvroDataType,
98 use_utf8view: bool,
99}
100
101impl<'a> RecordDecoderBuilder<'a> {
102 pub(crate) fn new(data_type: &'a AvroDataType) -> Self {
103 Self {
104 data_type,
105 use_utf8view: false,
106 }
107 }
108
109 pub(crate) fn with_utf8_view(mut self, use_utf8view: bool) -> Self {
110 self.use_utf8view = use_utf8view;
111 self
112 }
113
114 pub(crate) fn build(self) -> Result<RecordDecoder, ArrowError> {
116 RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view)
117 }
118}
119
120#[derive(Debug)]
122pub(crate) struct RecordDecoder {
123 schema: SchemaRef,
124 fields: Vec<Decoder>,
125 use_utf8view: bool,
126 projector: Option<Projector>,
127}
128
129impl RecordDecoder {
130 pub(crate) fn new(data_type: &'_ AvroDataType) -> Self {
132 RecordDecoderBuilder::new(data_type).build().unwrap()
133 }
134
135 pub(crate) fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
137 RecordDecoderBuilder::new(data_type)
138 .with_utf8_view(true)
139 .build()
140 }
141
142 pub(crate) fn try_new_with_options(
153 data_type: &AvroDataType,
154 use_utf8view: bool,
155 ) -> Result<Self, ArrowError> {
156 match data_type.codec() {
157 Codec::Struct(reader_fields) => {
158 let mut arrow_fields = Vec::with_capacity(reader_fields.len());
160 let mut encodings = Vec::with_capacity(reader_fields.len());
161 for avro_field in reader_fields.iter() {
162 arrow_fields.push(avro_field.field());
163 encodings.push(Decoder::try_new(avro_field.data_type())?);
164 }
165 let projector = match data_type.resolution.as_ref() {
166 Some(ResolutionInfo::Record(rec)) => {
167 Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
168 }
169 _ => None,
170 };
171 Ok(Self {
172 schema: Arc::new(ArrowSchema::new(arrow_fields)),
173 fields: encodings,
174 use_utf8view,
175 projector,
176 })
177 }
178 other => Err(ArrowError::ParseError(format!(
179 "Expected record got {other:?}"
180 ))),
181 }
182 }
183
184 pub(crate) fn schema(&self) -> &SchemaRef {
186 &self.schema
187 }
188
189 pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> {
191 let mut cursor = AvroCursor::new(buf);
192 match self.projector.as_mut() {
193 Some(proj) => {
194 for _ in 0..count {
195 proj.project_record(&mut cursor, &mut self.fields)?;
196 }
197 }
198 None => {
199 for _ in 0..count {
200 for field in &mut self.fields {
201 field.decode(&mut cursor)?;
202 }
203 }
204 }
205 }
206 Ok(cursor.position())
207 }
208
209 pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
211 let arrays = self
212 .fields
213 .iter_mut()
214 .map(|x| x.flush(None))
215 .collect::<Result<Vec<_>, _>>()?;
216 RecordBatch::try_new(self.schema.clone(), arrays)
217 }
218}
219
220#[derive(Debug)]
221struct EnumResolution {
222 mapping: Arc<[i32]>,
223 default_index: i32,
224}
225
226#[derive(Debug, AsRefStr)]
227enum Decoder {
228 Null(usize),
229 Boolean(BooleanBufferBuilder),
230 Int32(Vec<i32>),
231 Int64(Vec<i64>),
232 #[cfg(feature = "avro_custom_types")]
233 DurationSecond(Vec<i64>),
234 #[cfg(feature = "avro_custom_types")]
235 DurationMillisecond(Vec<i64>),
236 #[cfg(feature = "avro_custom_types")]
237 DurationMicrosecond(Vec<i64>),
238 #[cfg(feature = "avro_custom_types")]
239 DurationNanosecond(Vec<i64>),
240 Float32(Vec<f32>),
241 Float64(Vec<f64>),
242 Date32(Vec<i32>),
243 TimeMillis(Vec<i32>),
244 TimeMicros(Vec<i64>),
245 TimestampMillis(bool, Vec<i64>),
246 TimestampMicros(bool, Vec<i64>),
247 Int32ToInt64(Vec<i64>),
248 Int32ToFloat32(Vec<f32>),
249 Int32ToFloat64(Vec<f64>),
250 Int64ToFloat32(Vec<f32>),
251 Int64ToFloat64(Vec<f64>),
252 Float32ToFloat64(Vec<f64>),
253 BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
254 StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
255 Binary(OffsetBufferBuilder<i32>, Vec<u8>),
256 String(OffsetBufferBuilder<i32>, Vec<u8>),
258 StringView(OffsetBufferBuilder<i32>, Vec<u8>),
260 Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
261 Record(Fields, Vec<Decoder>, Option<Projector>),
262 Map(
263 FieldRef,
264 OffsetBufferBuilder<i32>,
265 OffsetBufferBuilder<i32>,
266 Vec<u8>,
267 Box<Decoder>,
268 ),
269 Fixed(i32, Vec<u8>),
270 Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
271 Duration(IntervalMonthDayNanoBuilder),
272 Uuid(Vec<u8>),
273 #[cfg(feature = "small_decimals")]
274 Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
275 #[cfg(feature = "small_decimals")]
276 Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
277 Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
278 Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
279 Union(UnionDecoder),
280 Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
281}
282
283impl Decoder {
284 fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
285 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
286 if info.writer_is_union && !info.reader_is_union {
287 let mut clone = data_type.clone();
288 clone.resolution = None; let target = Box::new(Self::try_new_internal(&clone)?);
290 let decoder = Self::Union(
291 UnionDecoderBuilder::new()
292 .with_resolved_union(info.clone())
293 .with_target(target)
294 .build()?,
295 );
296 return Ok(decoder);
297 }
298 }
299 Self::try_new_internal(data_type)
300 }
301
302 fn try_new_internal(data_type: &AvroDataType) -> Result<Self, ArrowError> {
303 let promotion = match data_type.resolution.as_ref() {
305 Some(ResolutionInfo::Promotion(p)) => Some(p),
306 _ => None,
307 };
308 let decoder = match (data_type.codec(), promotion) {
309 (Codec::Int64, Some(Promotion::IntToLong)) => {
310 Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
311 }
312 (Codec::Float32, Some(Promotion::IntToFloat)) => {
313 Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
314 }
315 (Codec::Float64, Some(Promotion::IntToDouble)) => {
316 Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
317 }
318 (Codec::Float32, Some(Promotion::LongToFloat)) => {
319 Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
320 }
321 (Codec::Float64, Some(Promotion::LongToDouble)) => {
322 Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
323 }
324 (Codec::Float64, Some(Promotion::FloatToDouble)) => {
325 Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
326 }
327 (Codec::Utf8, Some(Promotion::BytesToString))
328 | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
329 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
330 Vec::with_capacity(DEFAULT_CAPACITY),
331 ),
332 (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
333 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
334 Vec::with_capacity(DEFAULT_CAPACITY),
335 ),
336 (Codec::Null, _) => Self::Null(0),
337 (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
338 (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
339 (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
340 (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
341 (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
342 (Codec::Binary, _) => Self::Binary(
343 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
344 Vec::with_capacity(DEFAULT_CAPACITY),
345 ),
346 (Codec::Utf8, _) => Self::String(
347 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
348 Vec::with_capacity(DEFAULT_CAPACITY),
349 ),
350 (Codec::Utf8View, _) => Self::StringView(
351 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
352 Vec::with_capacity(DEFAULT_CAPACITY),
353 ),
354 (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
355 (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
356 (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
357 (Codec::TimestampMillis(is_utc), _) => {
358 Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
359 }
360 (Codec::TimestampMicros(is_utc), _) => {
361 Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
362 }
363 #[cfg(feature = "avro_custom_types")]
364 (Codec::DurationNanos, _) => {
365 Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
366 }
367 #[cfg(feature = "avro_custom_types")]
368 (Codec::DurationMicros, _) => {
369 Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
370 }
371 #[cfg(feature = "avro_custom_types")]
372 (Codec::DurationMillis, _) => {
373 Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
374 }
375 #[cfg(feature = "avro_custom_types")]
376 (Codec::DurationSeconds, _) => {
377 Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
378 }
379 (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
380 (Codec::Decimal(precision, scale, size), _) => {
381 let p = *precision;
382 let s = *scale;
383 let prec = p as u8;
384 let scl = s.unwrap_or(0) as i8;
385 #[cfg(feature = "small_decimals")]
386 {
387 if p <= DECIMAL32_MAX_PRECISION as usize {
388 let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
389 .with_precision_and_scale(prec, scl)?;
390 Self::Decimal32(p, s, *size, builder)
391 } else if p <= DECIMAL64_MAX_PRECISION as usize {
392 let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
393 .with_precision_and_scale(prec, scl)?;
394 Self::Decimal64(p, s, *size, builder)
395 } else if p <= DECIMAL128_MAX_PRECISION as usize {
396 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
397 .with_precision_and_scale(prec, scl)?;
398 Self::Decimal128(p, s, *size, builder)
399 } else if p <= DECIMAL256_MAX_PRECISION as usize {
400 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
401 .with_precision_and_scale(prec, scl)?;
402 Self::Decimal256(p, s, *size, builder)
403 } else {
404 return Err(ArrowError::ParseError(format!(
405 "Decimal precision {p} exceeds maximum supported"
406 )));
407 }
408 }
409 #[cfg(not(feature = "small_decimals"))]
410 {
411 if p <= DECIMAL128_MAX_PRECISION as usize {
412 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
413 .with_precision_and_scale(prec, scl)?;
414 Self::Decimal128(p, s, *size, builder)
415 } else if p <= DECIMAL256_MAX_PRECISION as usize {
416 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
417 .with_precision_and_scale(prec, scl)?;
418 Self::Decimal256(p, s, *size, builder)
419 } else {
420 return Err(ArrowError::ParseError(format!(
421 "Decimal precision {p} exceeds maximum supported"
422 )));
423 }
424 }
425 }
426 (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
427 (Codec::List(item), _) => {
428 let decoder = Self::try_new(item)?;
429 Self::Array(
430 Arc::new(item.field_with_name("item")),
431 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
432 Box::new(decoder),
433 )
434 }
435 (Codec::Enum(symbols), _) => {
436 let res = match data_type.resolution.as_ref() {
437 Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
438 mapping: mapping.mapping.clone(),
439 default_index: mapping.default_index,
440 }),
441 _ => None,
442 };
443 Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
444 }
445 (Codec::Struct(fields), _) => {
446 let mut arrow_fields = Vec::with_capacity(fields.len());
447 let mut encodings = Vec::with_capacity(fields.len());
448 for avro_field in fields.iter() {
449 let encoding = Self::try_new(avro_field.data_type())?;
450 arrow_fields.push(avro_field.field());
451 encodings.push(encoding);
452 }
453 let projector =
454 if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
455 Some(ProjectorBuilder::try_new(rec, fields).build()?)
456 } else {
457 None
458 };
459 Self::Record(arrow_fields.into(), encodings, projector)
460 }
461 (Codec::Map(child), _) => {
462 let val_field = child.field_with_name("value");
463 let map_field = Arc::new(ArrowField::new(
464 "entries",
465 DataType::Struct(Fields::from(vec![
466 ArrowField::new("key", DataType::Utf8, false),
467 val_field,
468 ])),
469 false,
470 ));
471 let val_dec = Self::try_new(child)?;
472 Self::Map(
473 map_field,
474 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
475 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
476 Vec::with_capacity(DEFAULT_CAPACITY),
477 Box::new(val_dec),
478 )
479 }
480 (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
481 (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
482 let decoders = encodings
483 .iter()
484 .map(Self::try_new_internal)
485 .collect::<Result<Vec<_>, _>>()?;
486 if fields.len() != decoders.len() {
487 return Err(ArrowError::SchemaError(format!(
488 "Union has {} fields but {} decoders",
489 fields.len(),
490 decoders.len()
491 )));
492 }
493 let branch_count = decoders.len();
496 let max_addr = (i32::MAX as usize) + 1;
497 if branch_count > max_addr {
498 return Err(ArrowError::SchemaError(format!(
499 "Union has {branch_count} branches, which exceeds the maximum addressable \
500 branches by an Avro int tag ({} + 1).",
501 i32::MAX
502 )));
503 }
504 let mut builder = UnionDecoderBuilder::new()
505 .with_fields(fields.clone())
506 .with_branches(decoders);
507 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
508 if info.reader_is_union {
509 builder = builder.with_resolved_union(info.clone());
510 }
511 }
512 Self::Union(builder.build()?)
513 }
514 (Codec::Union(_, _, _), _) => {
515 return Err(ArrowError::NotYetImplemented(
516 "Sparse Arrow unions are not yet supported".to_string(),
517 ));
518 }
519 };
520 Ok(match data_type.nullability() {
521 Some(nullability) => {
522 let mut plan = NullablePlan::ReadTag;
524 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
525 if !info.writer_is_union && info.reader_is_union {
526 if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
527 plan = NullablePlan::FromSingle { promotion: *promo };
528 }
529 }
530 }
531 Self::Nullable(
532 nullability,
533 NullBufferBuilder::new(DEFAULT_CAPACITY),
534 Box::new(decoder),
535 plan,
536 )
537 }
538 None => decoder,
539 })
540 }
541
542 fn append_null(&mut self) -> Result<(), ArrowError> {
544 match self {
545 Self::Null(count) => *count += 1,
546 Self::Boolean(b) => b.append(false),
547 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
548 Self::Int64(v)
549 | Self::Int32ToInt64(v)
550 | Self::TimeMicros(v)
551 | Self::TimestampMillis(_, v)
552 | Self::TimestampMicros(_, v) => v.push(0),
553 #[cfg(feature = "avro_custom_types")]
554 Self::DurationSecond(v)
555 | Self::DurationMillisecond(v)
556 | Self::DurationMicrosecond(v)
557 | Self::DurationNanosecond(v) => v.push(0),
558 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
559 Self::Float64(v)
560 | Self::Int32ToFloat64(v)
561 | Self::Int64ToFloat64(v)
562 | Self::Float32ToFloat64(v) => v.push(0.),
563 Self::Binary(offsets, _)
564 | Self::String(offsets, _)
565 | Self::StringView(offsets, _)
566 | Self::BytesToString(offsets, _)
567 | Self::StringToBytes(offsets, _) => {
568 offsets.push_length(0);
569 }
570 Self::Uuid(v) => {
571 v.extend([0; 16]);
572 }
573 Self::Array(_, offsets, _) => {
574 offsets.push_length(0);
575 }
576 Self::Record(_, e, _) => {
577 for encoding in e.iter_mut() {
578 encoding.append_null();
579 }
580 }
581 Self::Map(_, _koff, moff, _, _) => {
582 moff.push_length(0);
583 }
584 Self::Fixed(sz, accum) => {
585 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
586 }
587 #[cfg(feature = "small_decimals")]
588 Self::Decimal32(_, _, _, builder) => builder.append_value(0),
589 #[cfg(feature = "small_decimals")]
590 Self::Decimal64(_, _, _, builder) => builder.append_value(0),
591 Self::Decimal128(_, _, _, builder) => builder.append_value(0),
592 Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
593 Self::Enum(indices, _, _) => indices.push(0),
594 Self::Duration(builder) => builder.append_null(),
595 Self::Union(u) => u.append_null()?,
596 Self::Nullable(_, null_buffer, inner, _) => {
597 null_buffer.append(false);
598 inner.append_null();
599 }
600 }
601 Ok(())
602 }
603
604 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
606 match self {
607 Self::Nullable(_, nb, inner, _) => {
608 if matches!(lit, AvroLiteral::Null) {
609 nb.append(false);
610 inner.append_null()
611 } else {
612 nb.append(true);
613 inner.append_default(lit)
614 }
615 }
616 Self::Null(count) => match lit {
617 AvroLiteral::Null => {
618 *count += 1;
619 Ok(())
620 }
621 _ => Err(ArrowError::InvalidArgumentError(
622 "Non-null default for null type".to_string(),
623 )),
624 },
625 Self::Boolean(b) => match lit {
626 AvroLiteral::Boolean(v) => {
627 b.append(*v);
628 Ok(())
629 }
630 _ => Err(ArrowError::InvalidArgumentError(
631 "Default for boolean must be boolean".to_string(),
632 )),
633 },
634 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
635 AvroLiteral::Int(i) => {
636 v.push(*i);
637 Ok(())
638 }
639 _ => Err(ArrowError::InvalidArgumentError(
640 "Default for int32/date32/time-millis must be int".to_string(),
641 )),
642 },
643 #[cfg(feature = "avro_custom_types")]
644 Self::DurationSecond(v)
645 | Self::DurationMillisecond(v)
646 | Self::DurationMicrosecond(v)
647 | Self::DurationNanosecond(v) => match lit {
648 AvroLiteral::Long(i) => {
649 v.push(*i);
650 Ok(())
651 }
652 _ => Err(ArrowError::InvalidArgumentError(
653 "Default for duration long must be long".to_string(),
654 )),
655 },
656 Self::Int64(v)
657 | Self::Int32ToInt64(v)
658 | Self::TimeMicros(v)
659 | Self::TimestampMillis(_, v)
660 | Self::TimestampMicros(_, v) => match lit {
661 AvroLiteral::Long(i) => {
662 v.push(*i);
663 Ok(())
664 }
665 AvroLiteral::Int(i) => {
666 v.push(*i as i64);
667 Ok(())
668 }
669 _ => Err(ArrowError::InvalidArgumentError(
670 "Default for long/time-micros/timestamp must be long or int".to_string(),
671 )),
672 },
673 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
674 AvroLiteral::Float(f) => {
675 v.push(*f);
676 Ok(())
677 }
678 _ => Err(ArrowError::InvalidArgumentError(
679 "Default for float must be float".to_string(),
680 )),
681 },
682 Self::Float64(v)
683 | Self::Int32ToFloat64(v)
684 | Self::Int64ToFloat64(v)
685 | Self::Float32ToFloat64(v) => match lit {
686 AvroLiteral::Double(f) => {
687 v.push(*f);
688 Ok(())
689 }
690 _ => Err(ArrowError::InvalidArgumentError(
691 "Default for double must be double".to_string(),
692 )),
693 },
694 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
695 AvroLiteral::Bytes(b) => {
696 offsets.push_length(b.len());
697 values.extend_from_slice(b);
698 Ok(())
699 }
700 _ => Err(ArrowError::InvalidArgumentError(
701 "Default for bytes must be bytes".to_string(),
702 )),
703 },
704 Self::BytesToString(offsets, values)
705 | Self::String(offsets, values)
706 | Self::StringView(offsets, values) => match lit {
707 AvroLiteral::String(s) => {
708 let b = s.as_bytes();
709 offsets.push_length(b.len());
710 values.extend_from_slice(b);
711 Ok(())
712 }
713 _ => Err(ArrowError::InvalidArgumentError(
714 "Default for string must be string".to_string(),
715 )),
716 },
717 Self::Uuid(values) => match lit {
718 AvroLiteral::String(s) => {
719 let uuid = Uuid::try_parse(s).map_err(|e| {
720 ArrowError::InvalidArgumentError(format!("Invalid UUID default: {s} ({e})"))
721 })?;
722 values.extend_from_slice(uuid.as_bytes());
723 Ok(())
724 }
725 _ => Err(ArrowError::InvalidArgumentError(
726 "Default for uuid must be string".to_string(),
727 )),
728 },
729 Self::Fixed(sz, accum) => match lit {
730 AvroLiteral::Bytes(b) => {
731 if b.len() != *sz as usize {
732 return Err(ArrowError::InvalidArgumentError(format!(
733 "Fixed default length {} does not match size {sz}",
734 b.len(),
735 )));
736 }
737 accum.extend_from_slice(b);
738 Ok(())
739 }
740 _ => Err(ArrowError::InvalidArgumentError(
741 "Default for fixed must be bytes".to_string(),
742 )),
743 },
744 #[cfg(feature = "small_decimals")]
745 Self::Decimal32(_, _, _, builder) => {
746 append_decimal_default!(lit, builder, 4, i32, "decimal32")
747 }
748 #[cfg(feature = "small_decimals")]
749 Self::Decimal64(_, _, _, builder) => {
750 append_decimal_default!(lit, builder, 8, i64, "decimal64")
751 }
752 Self::Decimal128(_, _, _, builder) => {
753 append_decimal_default!(lit, builder, 16, i128, "decimal128")
754 }
755 Self::Decimal256(_, _, _, builder) => {
756 append_decimal_default!(lit, builder, 32, i256, "decimal256")
757 }
758 Self::Duration(builder) => match lit {
759 AvroLiteral::Bytes(b) => {
760 if b.len() != 12 {
761 return Err(ArrowError::InvalidArgumentError(format!(
762 "Duration default must be exactly 12 bytes, got {}",
763 b.len()
764 )));
765 }
766 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
767 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
768 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
769 let nanos = (millis as i64) * 1_000_000;
770 builder.append_value(IntervalMonthDayNano::new(
771 months as i32,
772 days as i32,
773 nanos,
774 ));
775 Ok(())
776 }
777 _ => Err(ArrowError::InvalidArgumentError(
778 "Default for duration must be 12-byte little-endian months/days/millis"
779 .to_string(),
780 )),
781 },
782 Self::Array(_, offsets, inner) => match lit {
783 AvroLiteral::Array(items) => {
784 offsets.push_length(items.len());
785 for item in items {
786 inner.append_default(item)?;
787 }
788 Ok(())
789 }
790 _ => Err(ArrowError::InvalidArgumentError(
791 "Default for array must be an array literal".to_string(),
792 )),
793 },
794 Self::Map(_, koff, moff, kdata, valdec) => match lit {
795 AvroLiteral::Map(entries) => {
796 moff.push_length(entries.len());
797 for (k, v) in entries {
798 let kb = k.as_bytes();
799 koff.push_length(kb.len());
800 kdata.extend_from_slice(kb);
801 valdec.append_default(v)?;
802 }
803 Ok(())
804 }
805 _ => Err(ArrowError::InvalidArgumentError(
806 "Default for map must be a map/object literal".to_string(),
807 )),
808 },
809 Self::Enum(indices, symbols, _) => match lit {
810 AvroLiteral::Enum(sym) => {
811 let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
812 ArrowError::InvalidArgumentError(format!(
813 "Enum default symbol {sym:?} not in reader symbols"
814 ))
815 })?;
816 indices.push(pos as i32);
817 Ok(())
818 }
819 _ => Err(ArrowError::InvalidArgumentError(
820 "Default for enum must be a symbol".to_string(),
821 )),
822 },
823 Self::Union(u) => u.append_default(lit),
824 Self::Record(field_meta, decoders, projector) => match lit {
825 AvroLiteral::Map(entries) => {
826 for (i, dec) in decoders.iter_mut().enumerate() {
827 let name = field_meta[i].name();
828 if let Some(sub) = entries.get(name) {
829 dec.append_default(sub)?;
830 } else if let Some(proj) = projector.as_ref() {
831 proj.project_default(dec, i)?;
832 } else {
833 dec.append_null();
834 }
835 }
836 Ok(())
837 }
838 AvroLiteral::Null => {
839 for (i, dec) in decoders.iter_mut().enumerate() {
840 if let Some(proj) = projector.as_ref() {
841 proj.project_default(dec, i)?;
842 } else {
843 dec.append_null();
844 }
845 }
846 Ok(())
847 }
848 _ => Err(ArrowError::InvalidArgumentError(
849 "Default for record must be a map/object or null".to_string(),
850 )),
851 },
852 }
853 }
854
855 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
857 match self {
858 Self::Null(x) => *x += 1,
859 Self::Boolean(values) => values.append(buf.get_bool()?),
860 Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
861 values.push(buf.get_int()?)
862 }
863 Self::Int64(values)
864 | Self::TimeMicros(values)
865 | Self::TimestampMillis(_, values)
866 | Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
867 #[cfg(feature = "avro_custom_types")]
868 Self::DurationSecond(values)
869 | Self::DurationMillisecond(values)
870 | Self::DurationMicrosecond(values)
871 | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
872 Self::Float32(values) => values.push(buf.get_float()?),
873 Self::Float64(values) => values.push(buf.get_double()?),
874 Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
875 Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
876 Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
877 Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
878 Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
879 Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
880 Self::StringToBytes(offsets, values)
881 | Self::BytesToString(offsets, values)
882 | Self::Binary(offsets, values)
883 | Self::String(offsets, values)
884 | Self::StringView(offsets, values) => {
885 let data = buf.get_bytes()?;
886 offsets.push_length(data.len());
887 values.extend_from_slice(data);
888 }
889 Self::Uuid(values) => {
890 let s_bytes = buf.get_bytes()?;
891 let s = std::str::from_utf8(s_bytes).map_err(|e| {
892 ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
893 })?;
894 let uuid = Uuid::try_parse(s)
895 .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?;
896 values.extend_from_slice(uuid.as_bytes());
897 }
898 Self::Array(_, off, encoding) => {
899 let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
900 off.push_length(total_items);
901 }
902 Self::Record(_, encodings, None) => {
903 for encoding in encodings {
904 encoding.decode(buf)?;
905 }
906 }
907 Self::Record(_, encodings, Some(proj)) => {
908 proj.project_record(buf, encodings)?;
909 }
910 Self::Map(_, koff, moff, kdata, valdec) => {
911 let newly_added = read_blocks(buf, |cur| {
912 let kb = cur.get_bytes()?;
913 koff.push_length(kb.len());
914 kdata.extend_from_slice(kb);
915 valdec.decode(cur)
916 })?;
917 moff.push_length(newly_added);
918 }
919 Self::Fixed(sz, accum) => {
920 let fx = buf.get_fixed(*sz as usize)?;
921 accum.extend_from_slice(fx);
922 }
923 #[cfg(feature = "small_decimals")]
924 Self::Decimal32(_, _, size, builder) => {
925 decode_decimal!(size, buf, builder, 4, i32);
926 }
927 #[cfg(feature = "small_decimals")]
928 Self::Decimal64(_, _, size, builder) => {
929 decode_decimal!(size, buf, builder, 8, i64);
930 }
931 Self::Decimal128(_, _, size, builder) => {
932 decode_decimal!(size, buf, builder, 16, i128);
933 }
934 Self::Decimal256(_, _, size, builder) => {
935 decode_decimal!(size, buf, builder, 32, i256);
936 }
937 Self::Enum(indices, _, None) => {
938 indices.push(buf.get_int()?);
939 }
940 Self::Enum(indices, _, Some(res)) => {
941 let raw = buf.get_int()?;
942 let resolved = usize::try_from(raw)
943 .ok()
944 .and_then(|idx| res.mapping.get(idx).copied())
945 .filter(|&idx| idx >= 0)
946 .unwrap_or(res.default_index);
947 if resolved >= 0 {
948 indices.push(resolved);
949 } else {
950 return Err(ArrowError::ParseError(format!(
951 "Enum symbol index {raw} not resolvable and no default provided",
952 )));
953 }
954 }
955 Self::Duration(builder) => {
956 let b = buf.get_fixed(12)?;
957 let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
958 let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
959 let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
960 let nanos = (millis as i64) * 1_000_000;
961 builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
962 }
963 Self::Union(u) => u.decode(buf)?,
964 Self::Nullable(order, nb, encoding, plan) => {
965 match *plan {
966 NullablePlan::FromSingle { promotion } => {
967 encoding.decode_with_promotion(buf, promotion)?;
968 nb.append(true);
969 }
970 NullablePlan::ReadTag => {
971 let branch = buf.read_vlq()?;
972 let is_not_null = match *order {
973 Nullability::NullFirst => branch != 0,
974 Nullability::NullSecond => branch == 0,
975 };
976 if is_not_null {
977 encoding.decode(buf)?;
979 } else {
980 encoding.append_null();
981 }
982 nb.append(is_not_null);
983 }
984 }
985 }
986 }
987 Ok(())
988 }
989
990 fn decode_with_promotion(
991 &mut self,
992 buf: &mut AvroCursor<'_>,
993 promotion: Promotion,
994 ) -> Result<(), ArrowError> {
995 macro_rules! promote_numeric_to {
996 ($variant:ident, $getter:ident, $to:ty) => {{
997 match self {
998 Self::$variant(v) => {
999 let x = buf.$getter()?;
1000 v.push(x as $to);
1001 Ok(())
1002 }
1003 other => Err(ArrowError::ParseError(format!(
1004 "Promotion {promotion} target mismatch: expected {}, got {}",
1005 stringify!($variant),
1006 <Self as ::std::convert::AsRef<str>>::as_ref(other)
1007 ))),
1008 }
1009 }};
1010 }
1011 match promotion {
1012 Promotion::Direct => self.decode(buf),
1013 Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1014 Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1015 Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1016 Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1017 Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1018 Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1019 Promotion::StringToBytes => match self {
1020 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1021 let data = buf.get_bytes()?;
1022 offsets.push_length(data.len());
1023 values.extend_from_slice(data);
1024 Ok(())
1025 }
1026 other => Err(ArrowError::ParseError(format!(
1027 "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1028 <Self as AsRef<str>>::as_ref(other)
1029 ))),
1030 },
1031 Promotion::BytesToString => match self {
1032 Self::String(offsets, values)
1033 | Self::StringView(offsets, values)
1034 | Self::BytesToString(offsets, values) => {
1035 let data = buf.get_bytes()?;
1036 offsets.push_length(data.len());
1037 values.extend_from_slice(data);
1038 Ok(())
1039 }
1040 other => Err(ArrowError::ParseError(format!(
1041 "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1042 <Self as AsRef<str>>::as_ref(other)
1043 ))),
1044 },
1045 }
1046 }
1047
1048 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1050 Ok(match self {
1051 Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1052 Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1053 Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1054 Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1055 Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1056 Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1057 Self::TimeMillis(values) => {
1058 Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1059 }
1060 Self::TimeMicros(values) => {
1061 Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1062 }
1063 Self::TimestampMillis(is_utc, values) => Arc::new(
1064 flush_primitive::<TimestampMillisecondType>(values, nulls)
1065 .with_timezone_opt(is_utc.then(|| "+00:00")),
1066 ),
1067 Self::TimestampMicros(is_utc, values) => Arc::new(
1068 flush_primitive::<TimestampMicrosecondType>(values, nulls)
1069 .with_timezone_opt(is_utc.then(|| "+00:00")),
1070 ),
1071 #[cfg(feature = "avro_custom_types")]
1072 Self::DurationSecond(values) => {
1073 Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1074 }
1075 #[cfg(feature = "avro_custom_types")]
1076 Self::DurationMillisecond(values) => {
1077 Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1078 }
1079 #[cfg(feature = "avro_custom_types")]
1080 Self::DurationMicrosecond(values) => {
1081 Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1082 }
1083 #[cfg(feature = "avro_custom_types")]
1084 Self::DurationNanosecond(values) => {
1085 Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1086 }
1087 Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1088 Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1089 Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1090 Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1091 Arc::new(flush_primitive::<Float32Type>(values, nulls))
1092 }
1093 Self::Int32ToFloat64(values)
1094 | Self::Int64ToFloat64(values)
1095 | Self::Float32ToFloat64(values) => {
1096 Arc::new(flush_primitive::<Float64Type>(values, nulls))
1097 }
1098 Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1099 let offsets = flush_offsets(offsets);
1100 let values = flush_values(values).into();
1101 Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1102 }
1103 Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1104 let offsets = flush_offsets(offsets);
1105 let values = flush_values(values).into();
1106 Arc::new(StringArray::try_new(offsets, values, nulls)?)
1107 }
1108 Self::StringView(offsets, values) => {
1109 let offsets = flush_offsets(offsets);
1110 let values = flush_values(values);
1111 let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1112 let values: Vec<&str> = (0..array.len())
1113 .map(|i| {
1114 if array.is_valid(i) {
1115 array.value(i)
1116 } else {
1117 ""
1118 }
1119 })
1120 .collect();
1121 Arc::new(StringViewArray::from(values))
1122 }
1123 Self::Array(field, offsets, values) => {
1124 let values = values.flush(None)?;
1125 let offsets = flush_offsets(offsets);
1126 Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1127 }
1128 Self::Record(fields, encodings, _) => {
1129 let arrays = encodings
1130 .iter_mut()
1131 .map(|x| x.flush(None))
1132 .collect::<Result<Vec<_>, _>>()?;
1133 Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1134 }
1135 Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1136 let moff = flush_offsets(m_off);
1137 let koff = flush_offsets(k_off);
1138 let kd = flush_values(kdata).into();
1139 let val_arr = valdec.flush(None)?;
1140 let key_arr = StringArray::try_new(koff, kd, None)?;
1141 if key_arr.len() != val_arr.len() {
1142 return Err(ArrowError::InvalidArgumentError(format!(
1143 "Map keys length ({}) != map values length ({})",
1144 key_arr.len(),
1145 val_arr.len()
1146 )));
1147 }
1148 let final_len = moff.len() - 1;
1149 if let Some(n) = &nulls {
1150 if n.len() != final_len {
1151 return Err(ArrowError::InvalidArgumentError(format!(
1152 "Map array null buffer length {} != final map length {final_len}",
1153 n.len()
1154 )));
1155 }
1156 }
1157 let entries_fields = match map_field.data_type() {
1158 DataType::Struct(fields) => fields.clone(),
1159 other => {
1160 return Err(ArrowError::InvalidArgumentError(format!(
1161 "Map entries field must be a Struct, got {other:?}"
1162 )));
1163 }
1164 };
1165 let entries_struct =
1166 StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1167 let map_arr =
1168 MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1169 Arc::new(map_arr)
1170 }
1171 Self::Fixed(sz, accum) => {
1172 let b: Buffer = flush_values(accum).into();
1173 let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1174 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1175 Arc::new(arr)
1176 }
1177 Self::Uuid(values) => {
1178 let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1179 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1180 Arc::new(arr)
1181 }
1182 #[cfg(feature = "small_decimals")]
1183 Self::Decimal32(precision, scale, _, builder) => {
1184 flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1185 }
1186 #[cfg(feature = "small_decimals")]
1187 Self::Decimal64(precision, scale, _, builder) => {
1188 flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1189 }
1190 Self::Decimal128(precision, scale, _, builder) => {
1191 flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1192 }
1193 Self::Decimal256(precision, scale, _, builder) => {
1194 flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1195 }
1196 Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1197 Self::Duration(builder) => {
1198 let (_, vals, _) = builder.finish().into_parts();
1199 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1200 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1201 Arc::new(vals)
1202 }
1203 Self::Union(u) => u.flush(nulls)?,
1204 })
1205 }
1206}
1207
1208#[derive(Debug)]
1210struct DispatchLookupTable {
1211 to_reader: Box<[i8]>,
1227 promotion: Box<[Promotion]>,
1232}
1233
1234const NO_SOURCE: i8 = -1;
1237
1238impl DispatchLookupTable {
1239 fn from_writer_to_reader(
1240 promotion_map: &[Option<(usize, Promotion)>],
1241 ) -> Result<Self, ArrowError> {
1242 let mut to_reader = Vec::with_capacity(promotion_map.len());
1243 let mut promotion = Vec::with_capacity(promotion_map.len());
1244 for map in promotion_map {
1245 match *map {
1246 Some((idx, promo)) => {
1247 let idx_i8 = i8::try_from(idx).map_err(|_| {
1248 ArrowError::SchemaError(format!(
1249 "Reader branch index {idx} exceeds i8 range (max {})",
1250 i8::MAX
1251 ))
1252 })?;
1253 to_reader.push(idx_i8);
1254 promotion.push(promo);
1255 }
1256 None => {
1257 to_reader.push(NO_SOURCE);
1258 promotion.push(Promotion::Direct);
1259 }
1260 }
1261 }
1262 Ok(Self {
1263 to_reader: to_reader.into_boxed_slice(),
1264 promotion: promotion.into_boxed_slice(),
1265 })
1266 }
1267
1268 #[inline]
1270 fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1271 let reader_index = *self.to_reader.get(writer_index)?;
1272 (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1273 }
1274}
1275
1276#[derive(Debug)]
1277struct UnionDecoder {
1278 fields: UnionFields,
1279 type_ids: Vec<i8>,
1280 offsets: Vec<i32>,
1281 branches: Vec<Decoder>,
1282 counts: Vec<i32>,
1283 reader_type_codes: Vec<i8>,
1284 null_branch: Option<usize>,
1285 default_emit_idx: usize,
1286 null_emit_idx: usize,
1287 plan: UnionReadPlan,
1288}
1289
1290impl Default for UnionDecoder {
1291 fn default() -> Self {
1292 Self {
1293 fields: UnionFields::empty(),
1294 type_ids: Vec::new(),
1295 offsets: Vec::new(),
1296 branches: Vec::new(),
1297 counts: Vec::new(),
1298 reader_type_codes: Vec::new(),
1299 null_branch: None,
1300 default_emit_idx: 0,
1301 null_emit_idx: 0,
1302 plan: UnionReadPlan::Passthrough,
1303 }
1304 }
1305}
1306
1307#[derive(Debug)]
1308enum UnionReadPlan {
1309 ReaderUnion {
1310 lookup_table: DispatchLookupTable,
1311 },
1312 FromSingle {
1313 reader_idx: usize,
1314 promotion: Promotion,
1315 },
1316 ToSingle {
1317 target: Box<Decoder>,
1318 lookup_table: DispatchLookupTable,
1319 },
1320 Passthrough,
1321}
1322
1323impl UnionDecoder {
1324 fn try_new(
1325 fields: UnionFields,
1326 branches: Vec<Decoder>,
1327 resolved: Option<ResolvedUnion>,
1328 ) -> Result<Self, ArrowError> {
1329 let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1330 let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1331 let default_emit_idx = 0;
1332 let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1333 let branch_len = branches.len().max(reader_type_codes.len());
1334 let max_addr = (i32::MAX as usize) + 1;
1336 if branches.len() > max_addr {
1337 return Err(ArrowError::SchemaError(format!(
1338 "Reader union has {} branches, which exceeds the maximum addressable \
1339 branches by an Avro int tag ({} + 1).",
1340 branches.len(),
1341 i32::MAX
1342 )));
1343 }
1344 Ok(Self {
1345 fields,
1346 type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1347 offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1348 branches,
1349 counts: vec![0; branch_len],
1350 reader_type_codes,
1351 null_branch,
1352 default_emit_idx,
1353 null_emit_idx,
1354 plan: Self::plan_from_resolved(resolved)?,
1355 })
1356 }
1357
1358 fn try_new_from_writer_union(
1359 info: ResolvedUnion,
1360 target: Box<Decoder>,
1361 ) -> Result<Self, ArrowError> {
1362 debug_assert!(info.writer_is_union && !info.reader_is_union);
1364 let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1365 Ok(Self {
1366 plan: UnionReadPlan::ToSingle {
1367 target,
1368 lookup_table,
1369 },
1370 ..Self::default()
1371 })
1372 }
1373
1374 fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, ArrowError> {
1375 let Some(info) = resolved else {
1376 return Ok(UnionReadPlan::Passthrough);
1377 };
1378 match (info.writer_is_union, info.reader_is_union) {
1379 (true, true) => {
1380 let lookup_table =
1381 DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1382 Ok(UnionReadPlan::ReaderUnion { lookup_table })
1383 }
1384 (false, true) => {
1385 let Some(&(reader_idx, promotion)) =
1386 info.writer_to_reader.first().and_then(Option::as_ref)
1387 else {
1388 return Err(ArrowError::SchemaError(
1389 "Writer type does not match any reader union branch".to_string(),
1390 ));
1391 };
1392 Ok(UnionReadPlan::FromSingle {
1393 reader_idx,
1394 promotion,
1395 })
1396 }
1397 (true, false) => Err(ArrowError::InvalidArgumentError(
1398 "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1399 .to_string(),
1400 )),
1401 _ => Err(ArrowError::SchemaError(
1403 "ResolvedUnion constructed for non-union sides; resolver should return None"
1404 .to_string(),
1405 )),
1406 }
1407 }
1408
1409 #[inline]
1410 fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
1411 let raw = buf.get_long()?;
1416 if raw < 0 {
1417 return Err(ArrowError::ParseError(format!(
1418 "Negative union branch index {raw}"
1419 )));
1420 }
1421 usize::try_from(raw).map_err(|_| {
1422 ArrowError::ParseError(format!(
1423 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1424 (usize::BITS as usize)
1425 ))
1426 })
1427 }
1428
1429 #[inline]
1430 fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, ArrowError> {
1431 let branches_len = self.branches.len();
1432 let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1433 return Err(ArrowError::ParseError(format!(
1434 "Union branch index {reader_idx} out of range ({branches_len} branches)"
1435 )));
1436 };
1437 self.type_ids.push(self.reader_type_codes[reader_idx]);
1438 self.offsets.push(self.counts[reader_idx]);
1439 self.counts[reader_idx] += 1;
1440 Ok(reader_branch)
1441 }
1442
1443 #[inline]
1444 fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), ArrowError>
1445 where
1446 F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
1447 {
1448 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1449 return action(target);
1450 }
1451 let reader_idx = match &self.plan {
1452 UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1453 _ => fallback_idx,
1454 };
1455 self.emit_to(reader_idx).and_then(action)
1456 }
1457
1458 fn append_null(&mut self) -> Result<(), ArrowError> {
1459 self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1460 }
1461
1462 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
1463 self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1464 }
1465
1466 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1467 let (reader_idx, promotion) = match &mut self.plan {
1468 UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1469 UnionReadPlan::ReaderUnion { lookup_table } => {
1470 let idx = Self::read_tag(buf)?;
1471 lookup_table.resolve(idx).ok_or_else(|| {
1472 ArrowError::ParseError(format!(
1473 "Union branch index {idx} not resolvable by reader schema"
1474 ))
1475 })?
1476 }
1477 UnionReadPlan::FromSingle {
1478 reader_idx,
1479 promotion,
1480 } => (*reader_idx, *promotion),
1481 UnionReadPlan::ToSingle {
1482 target,
1483 lookup_table,
1484 } => {
1485 let idx = Self::read_tag(buf)?;
1486 return match lookup_table.resolve(idx) {
1487 Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1488 None => Err(ArrowError::ParseError(format!(
1489 "Writer union branch {idx} does not resolve to reader type"
1490 ))),
1491 };
1492 }
1493 };
1494 let decoder = self.emit_to(reader_idx)?;
1495 decoder.decode_with_promotion(buf, promotion)
1496 }
1497
1498 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1499 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1500 return target.flush(nulls);
1501 }
1502 debug_assert!(
1503 nulls.is_none(),
1504 "UnionArray does not accept a validity bitmap; \
1505 nulls should have been materialized as a Null child during decode"
1506 );
1507 let children = self
1508 .branches
1509 .iter_mut()
1510 .map(|d| d.flush(None))
1511 .collect::<Result<Vec<_>, _>>()?;
1512 let arr = UnionArray::try_new(
1513 self.fields.clone(),
1514 flush_values(&mut self.type_ids).into_iter().collect(),
1515 Some(flush_values(&mut self.offsets).into_iter().collect()),
1516 children,
1517 )
1518 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1519 Ok(Arc::new(arr))
1520 }
1521}
1522
1523#[derive(Debug, Default)]
1524struct UnionDecoderBuilder {
1525 fields: Option<UnionFields>,
1526 branches: Option<Vec<Decoder>>,
1527 resolved: Option<ResolvedUnion>,
1528 target: Option<Box<Decoder>>,
1529}
1530
1531impl UnionDecoderBuilder {
1532 fn new() -> Self {
1533 Self::default()
1534 }
1535
1536 fn with_fields(mut self, fields: UnionFields) -> Self {
1537 self.fields = Some(fields);
1538 self
1539 }
1540
1541 fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
1542 self.branches = Some(branches);
1543 self
1544 }
1545
1546 fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
1547 self.resolved = Some(resolved_union);
1548 self
1549 }
1550
1551 fn with_target(mut self, target: Box<Decoder>) -> Self {
1552 self.target = Some(target);
1553 self
1554 }
1555
1556 fn build(self) -> Result<UnionDecoder, ArrowError> {
1557 match (self.resolved, self.fields, self.branches, self.target) {
1558 (resolved, Some(fields), Some(branches), None) => {
1559 UnionDecoder::try_new(fields, branches, resolved)
1560 }
1561 (Some(info), None, None, Some(target))
1562 if info.writer_is_union && !info.reader_is_union =>
1563 {
1564 UnionDecoder::try_new_from_writer_union(info, target)
1565 }
1566 _ => Err(ArrowError::InvalidArgumentError(
1567 "Invalid UnionDecoderBuilder configuration: expected either \
1568 (fields + branches + resolved) with no target for reader-unions, or \
1569 (resolved + target) with no fields/branches for writer-union to single."
1570 .to_string(),
1571 )),
1572 }
1573 }
1574}
1575
1576#[derive(Debug, Copy, Clone)]
1577enum NegativeBlockBehavior {
1578 ProcessItems,
1579 SkipBySize,
1580}
1581
1582#[inline]
1583fn skip_blocks(
1584 buf: &mut AvroCursor,
1585 mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1586) -> Result<usize, ArrowError> {
1587 process_blockwise(
1588 buf,
1589 move |c| skip_item(c),
1590 NegativeBlockBehavior::SkipBySize,
1591 )
1592}
1593
1594#[inline]
1595fn flush_dict(
1596 indices: &mut Vec<i32>,
1597 symbols: &[String],
1598 nulls: Option<NullBuffer>,
1599) -> Result<ArrayRef, ArrowError> {
1600 let keys = flush_primitive::<Int32Type>(indices, nulls);
1601 let values = Arc::new(StringArray::from_iter_values(
1602 symbols.iter().map(|s| s.as_str()),
1603 ));
1604 DictionaryArray::try_new(keys, values)
1605 .map_err(|e| ArrowError::ParseError(e.to_string()))
1606 .map(|arr| Arc::new(arr) as ArrayRef)
1607}
1608
1609#[inline]
1610fn read_blocks(
1611 buf: &mut AvroCursor,
1612 decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1613) -> Result<usize, ArrowError> {
1614 process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
1615}
1616
1617#[inline]
1618fn process_blockwise(
1619 buf: &mut AvroCursor,
1620 mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1621 negative_behavior: NegativeBlockBehavior,
1622) -> Result<usize, ArrowError> {
1623 let mut total = 0usize;
1624 loop {
1625 let block_count = buf.get_long()?;
1630 match block_count.cmp(&0) {
1631 Ordering::Equal => break,
1632 Ordering::Less => {
1633 let count = (-block_count) as usize;
1634 let size_in_bytes = buf.get_long()? as usize;
1636 match negative_behavior {
1637 NegativeBlockBehavior::ProcessItems => {
1638 for _ in 0..count {
1640 on_item(buf)?;
1641 }
1642 }
1643 NegativeBlockBehavior::SkipBySize => {
1644 let _ = buf.get_fixed(size_in_bytes)?;
1646 }
1647 }
1648 total += count;
1649 }
1650 Ordering::Greater => {
1651 let count = block_count as usize;
1652 for _ in 0..count {
1653 on_item(buf)?;
1654 }
1655 total += count;
1656 }
1657 }
1658 }
1659 Ok(total)
1660}
1661
1662#[inline]
1663fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
1664 std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
1665}
1666
1667#[inline]
1668fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
1669 std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
1670}
1671
1672#[inline]
1673fn flush_primitive<T: ArrowPrimitiveType>(
1674 values: &mut Vec<T::Native>,
1675 nulls: Option<NullBuffer>,
1676) -> PrimitiveArray<T> {
1677 PrimitiveArray::new(flush_values(values).into(), nulls)
1678}
1679
1680#[inline]
1681fn read_decimal_bytes_be<const N: usize>(
1682 buf: &mut AvroCursor<'_>,
1683 size: &Option<usize>,
1684) -> Result<[u8; N], ArrowError> {
1685 match size {
1686 Some(n) if *n == N => {
1687 let raw = buf.get_fixed(N)?;
1688 let mut arr = [0u8; N];
1689 arr.copy_from_slice(raw);
1690 Ok(arr)
1691 }
1692 Some(n) => {
1693 let raw = buf.get_fixed(*n)?;
1694 sign_cast_to::<N>(raw)
1695 }
1696 None => {
1697 let raw = buf.get_bytes()?;
1698 sign_cast_to::<N>(raw)
1699 }
1700 }
1701}
1702
1703#[inline]
1712fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
1713 let len = raw.len();
1714 if len == N {
1716 let mut out = [0u8; N];
1717 out.copy_from_slice(raw);
1718 return Ok(out);
1719 }
1720 let first = raw.first().copied().unwrap_or(0u8);
1722 let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
1723 let mut out = [sign_byte; N];
1725 if len > N {
1726 let extra = len - N;
1729 if raw[..extra].iter().any(|&b| b != sign_byte) {
1731 return Err(ArrowError::ParseError(format!(
1732 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1733 len, N
1734 )));
1735 }
1736 if N > 0 {
1737 let first_kept = raw[extra];
1738 let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
1739 if sign_bit_mismatch {
1740 return Err(ArrowError::ParseError(format!(
1741 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1742 len, N
1743 )));
1744 }
1745 }
1746 out.copy_from_slice(&raw[extra..]);
1747 return Ok(out);
1748 }
1749 out[N - len..].copy_from_slice(raw);
1750 Ok(out)
1751}
1752
1753#[derive(Debug)]
1754struct Projector {
1755 writer_to_reader: Arc<[Option<usize>]>,
1756 skip_decoders: Vec<Option<Skipper>>,
1757 field_defaults: Vec<Option<AvroLiteral>>,
1758 default_injections: Arc<[(usize, AvroLiteral)]>,
1759}
1760
1761#[derive(Debug)]
1762struct ProjectorBuilder<'a> {
1763 rec: &'a ResolvedRecord,
1764 reader_fields: Arc<[AvroField]>,
1765}
1766
1767impl<'a> ProjectorBuilder<'a> {
1768 #[inline]
1769 fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
1770 Self {
1771 rec,
1772 reader_fields: reader_fields.clone(),
1773 }
1774 }
1775
1776 #[inline]
1777 fn build(self) -> Result<Projector, ArrowError> {
1778 let reader_fields = self.reader_fields;
1779 let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
1780 for avro_field in reader_fields.as_ref() {
1781 if let Some(ResolutionInfo::DefaultValue(lit)) =
1782 avro_field.data_type().resolution.as_ref()
1783 {
1784 field_defaults.push(Some(lit.clone()));
1785 } else {
1786 field_defaults.push(None);
1787 }
1788 }
1789 let mut default_injections: Vec<(usize, AvroLiteral)> =
1790 Vec::with_capacity(self.rec.default_fields.len());
1791 for &idx in self.rec.default_fields.as_ref() {
1792 let lit = field_defaults
1793 .get(idx)
1794 .and_then(|lit| lit.clone())
1795 .unwrap_or(AvroLiteral::Null);
1796 default_injections.push((idx, lit));
1797 }
1798 let mut skip_decoders: Vec<Option<Skipper>> =
1799 Vec::with_capacity(self.rec.skip_fields.len());
1800 for datatype in self.rec.skip_fields.as_ref() {
1801 let skipper = match datatype {
1802 Some(datatype) => Some(Skipper::from_avro(datatype)?),
1803 None => None,
1804 };
1805 skip_decoders.push(skipper);
1806 }
1807 Ok(Projector {
1808 writer_to_reader: self.rec.writer_to_reader.clone(),
1809 skip_decoders,
1810 field_defaults,
1811 default_injections: default_injections.into(),
1812 })
1813 }
1814}
1815
1816impl Projector {
1817 #[inline]
1818 fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), ArrowError> {
1819 if let Some(default_literal) = self.field_defaults[index].as_ref() {
1826 decoder.append_default(default_literal)
1827 } else {
1828 decoder.append_null()
1829 }
1830 }
1831
1832 #[inline]
1833 fn project_record(
1834 &mut self,
1835 buf: &mut AvroCursor<'_>,
1836 encodings: &mut [Decoder],
1837 ) -> Result<(), ArrowError> {
1838 debug_assert_eq!(
1839 self.writer_to_reader.len(),
1840 self.skip_decoders.len(),
1841 "internal invariant: mapping and skipper lists must have equal length"
1842 );
1843 for (i, (mapping, skipper_opt)) in self
1844 .writer_to_reader
1845 .iter()
1846 .zip(self.skip_decoders.iter_mut())
1847 .enumerate()
1848 {
1849 match (mapping, skipper_opt.as_mut()) {
1850 (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
1851 (None, Some(skipper)) => skipper.skip(buf)?,
1852 (None, None) => {
1853 return Err(ArrowError::SchemaError(format!(
1854 "No skipper available for writer-only field at index {i}",
1855 )));
1856 }
1857 }
1858 }
1859 for (reader_index, lit) in self.default_injections.as_ref() {
1860 encodings[*reader_index].append_default(lit)?;
1861 }
1862 Ok(())
1863 }
1864}
1865
1866#[derive(Debug)]
1872enum Skipper {
1873 Null,
1874 Boolean,
1875 Int32,
1876 Int64,
1877 Float32,
1878 Float64,
1879 Bytes,
1880 String,
1881 Date32,
1882 TimeMillis,
1883 TimeMicros,
1884 TimestampMillis,
1885 TimestampMicros,
1886 Fixed(usize),
1887 Decimal(Option<usize>),
1888 UuidString,
1889 Enum,
1890 DurationFixed12,
1891 List(Box<Skipper>),
1892 Map(Box<Skipper>),
1893 Struct(Vec<Skipper>),
1894 Union(Vec<Skipper>),
1895 Nullable(Nullability, Box<Skipper>),
1896}
1897
1898impl Skipper {
1899 fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
1900 let mut base = match dt.codec() {
1901 Codec::Null => Self::Null,
1902 Codec::Boolean => Self::Boolean,
1903 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1904 Codec::Int64 => Self::Int64,
1905 Codec::TimeMicros => Self::TimeMicros,
1906 Codec::TimestampMillis(_) => Self::TimestampMillis,
1907 Codec::TimestampMicros(_) => Self::TimestampMicros,
1908 Codec::Float32 => Self::Float32,
1909 Codec::Float64 => Self::Float64,
1910 Codec::Binary => Self::Bytes,
1911 Codec::Utf8 | Codec::Utf8View => Self::String,
1912 Codec::Fixed(sz) => Self::Fixed(*sz as usize),
1913 Codec::Decimal(_, _, size) => Self::Decimal(*size),
1914 Codec::Uuid => Self::UuidString, Codec::Enum(_) => Self::Enum,
1916 Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
1917 Codec::Struct(fields) => Self::Struct(
1918 fields
1919 .iter()
1920 .map(|f| Skipper::from_avro(f.data_type()))
1921 .collect::<Result<_, _>>()?,
1922 ),
1923 Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
1924 Codec::Interval => Self::DurationFixed12,
1925 Codec::Union(encodings, _, _) => {
1926 let max_addr = (i32::MAX as usize) + 1;
1927 if encodings.len() > max_addr {
1928 return Err(ArrowError::SchemaError(format!(
1929 "Writer union has {} branches, which exceeds the maximum addressable \
1930 branches by an Avro int tag ({} + 1).",
1931 encodings.len(),
1932 i32::MAX
1933 )));
1934 }
1935 Self::Union(
1936 encodings
1937 .iter()
1938 .map(Skipper::from_avro)
1939 .collect::<Result<_, _>>()?,
1940 )
1941 }
1942 _ => {
1943 return Err(ArrowError::NotYetImplemented(format!(
1944 "Skipper not implemented for codec {:?}",
1945 dt.codec()
1946 )));
1947 }
1948 };
1949 if let Some(n) = dt.nullability() {
1950 base = Self::Nullable(n, Box::new(base));
1951 }
1952 Ok(base)
1953 }
1954
1955 fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1956 match self {
1957 Self::Null => Ok(()),
1958 Self::Boolean => {
1959 buf.get_bool()?;
1960 Ok(())
1961 }
1962 Self::Int32 | Self::Date32 | Self::TimeMillis => {
1963 buf.get_int()?;
1964 Ok(())
1965 }
1966 Self::Int64 | Self::TimeMicros | Self::TimestampMillis | Self::TimestampMicros => {
1967 buf.get_long()?;
1968 Ok(())
1969 }
1970 Self::Float32 => {
1971 buf.get_float()?;
1972 Ok(())
1973 }
1974 Self::Float64 => {
1975 buf.get_double()?;
1976 Ok(())
1977 }
1978 Self::Bytes | Self::String | Self::UuidString => {
1979 buf.get_bytes()?;
1980 Ok(())
1981 }
1982 Self::Fixed(sz) => {
1983 buf.get_fixed(*sz)?;
1984 Ok(())
1985 }
1986 Self::Decimal(size) => {
1987 if let Some(s) = size {
1988 buf.get_fixed(*s)
1989 } else {
1990 buf.get_bytes()
1991 }?;
1992 Ok(())
1993 }
1994 Self::Enum => {
1995 buf.get_int()?;
1996 Ok(())
1997 }
1998 Self::DurationFixed12 => {
1999 buf.get_fixed(12)?;
2000 Ok(())
2001 }
2002 Self::List(item) => {
2003 skip_blocks(buf, |c| item.skip(c))?;
2004 Ok(())
2005 }
2006 Self::Map(value) => {
2007 skip_blocks(buf, |c| {
2008 c.get_bytes()?; value.skip(c)
2010 })?;
2011 Ok(())
2012 }
2013 Self::Struct(fields) => {
2014 for f in fields.iter_mut() {
2015 f.skip(buf)?
2016 }
2017 Ok(())
2018 }
2019 Self::Union(encodings) => {
2020 let raw = buf.get_long()?;
2022 if raw < 0 {
2023 return Err(ArrowError::ParseError(format!(
2024 "Negative union branch index {raw}"
2025 )));
2026 }
2027 let idx: usize = usize::try_from(raw).map_err(|_| {
2028 ArrowError::ParseError(format!(
2029 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2030 (usize::BITS as usize)
2031 ))
2032 })?;
2033 let Some(encoding) = encodings.get_mut(idx) else {
2034 return Err(ArrowError::ParseError(format!(
2035 "Union branch index {idx} out of range for skipper ({} branches)",
2036 encodings.len()
2037 )));
2038 };
2039 encoding.skip(buf)
2040 }
2041 Self::Nullable(order, inner) => {
2042 let branch = buf.read_vlq()?;
2043 let is_not_null = match *order {
2044 Nullability::NullFirst => branch != 0,
2045 Nullability::NullSecond => branch == 0,
2046 };
2047 if is_not_null {
2048 inner.skip(buf)?;
2049 }
2050 Ok(())
2051 }
2052 }
2053 }
2054}
2055
2056#[cfg(test)]
2057mod tests {
2058 use super::*;
2059 use crate::codec::AvroField;
2060 use crate::schema::{PrimitiveType, Schema, TypeName};
2061 use arrow_array::cast::AsArray;
2062 use indexmap::IndexMap;
2063
2064 fn encode_avro_int(value: i32) -> Vec<u8> {
2065 let mut buf = Vec::new();
2066 let mut v = (value << 1) ^ (value >> 31);
2067 while v & !0x7F != 0 {
2068 buf.push(((v & 0x7F) | 0x80) as u8);
2069 v >>= 7;
2070 }
2071 buf.push(v as u8);
2072 buf
2073 }
2074
2075 fn encode_avro_long(value: i64) -> Vec<u8> {
2076 let mut buf = Vec::new();
2077 let mut v = (value << 1) ^ (value >> 63);
2078 while v & !0x7F != 0 {
2079 buf.push(((v & 0x7F) | 0x80) as u8);
2080 v >>= 7;
2081 }
2082 buf.push(v as u8);
2083 buf
2084 }
2085
2086 fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2087 let mut buf = encode_avro_long(bytes.len() as i64);
2088 buf.extend_from_slice(bytes);
2089 buf
2090 }
2091
2092 fn avro_from_codec(codec: Codec) -> AvroDataType {
2093 AvroDataType::new(codec, Default::default(), None)
2094 }
2095
2096 fn decoder_for_promotion(
2097 writer: PrimitiveType,
2098 reader: PrimitiveType,
2099 use_utf8view: bool,
2100 ) -> Decoder {
2101 let ws = Schema::TypeName(TypeName::Primitive(writer));
2102 let rs = Schema::TypeName(TypeName::Primitive(reader));
2103 let field =
2104 AvroField::resolve_from_writer_and_reader(&ws, &rs, use_utf8view, false).unwrap();
2105 Decoder::try_new(field.data_type()).unwrap()
2106 }
2107
2108 #[test]
2109 fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2110 let ws = Schema::Union(vec![
2111 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2112 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2113 ]);
2114 let rs = Schema::Union(vec![
2115 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2116 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2117 ]);
2118 let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2119 let mut dec = Decoder::try_new(field.data_type()).unwrap();
2120 let mut rec1 = encode_avro_long(0);
2121 rec1.extend(encode_avro_int(7));
2122 let mut cur1 = AvroCursor::new(&rec1);
2123 dec.decode(&mut cur1).unwrap();
2124 let mut rec2 = encode_avro_long(1);
2125 rec2.extend(encode_avro_bytes("abc".as_bytes()));
2126 let mut cur2 = AvroCursor::new(&rec2);
2127 dec.decode(&mut cur2).unwrap();
2128 let arr = dec.flush(None).unwrap();
2129 let ua = arr
2130 .as_any()
2131 .downcast_ref::<UnionArray>()
2132 .expect("dense union output");
2133 assert_eq!(
2134 ua.type_id(0),
2135 1,
2136 "first value must select reader 'long' branch"
2137 );
2138 assert_eq!(ua.value_offset(0), 0);
2139 assert_eq!(
2140 ua.type_id(1),
2141 0,
2142 "second value must select reader 'string' branch"
2143 );
2144 assert_eq!(ua.value_offset(1), 0);
2145 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2146 assert_eq!(long_child.len(), 1);
2147 assert_eq!(long_child.value(0), 7);
2148 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2149 assert_eq!(str_child.len(), 1);
2150 assert_eq!(str_child.value(0), "abc");
2151 }
2152
2153 #[test]
2154 fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2155 let ws = Schema::Union(vec![
2156 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2157 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2158 ]);
2159 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2160 let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2161 let mut dec = Decoder::try_new(field.data_type()).unwrap();
2162 let mut data = encode_avro_long(0);
2163 data.extend(encode_avro_int(5));
2164 let mut cur = AvroCursor::new(&data);
2165 dec.decode(&mut cur).unwrap();
2166 let arr = dec.flush(None).unwrap();
2167 let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2168 assert_eq!(out.len(), 1);
2169 assert_eq!(out.value(0), 5);
2170 }
2171
2172 #[test]
2173 fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2174 let ws = Schema::Union(vec![
2175 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2176 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2177 ]);
2178 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2179 let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2180 let mut dec = Decoder::try_new(field.data_type()).unwrap();
2181 let mut data = encode_avro_long(1);
2182 data.extend(encode_avro_bytes("z".as_bytes()));
2183 let mut cur = AvroCursor::new(&data);
2184 let res = dec.decode(&mut cur);
2185 assert!(
2186 res.is_err(),
2187 "expected error when writer union branch does not resolve to reader non-union type"
2188 );
2189 }
2190
2191 #[test]
2192 fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2193 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2194 let rs = Schema::Union(vec![
2195 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2196 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2197 ]);
2198 let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2199 let mut dec = Decoder::try_new(field.data_type()).unwrap();
2200 let data = encode_avro_int(6);
2201 let mut cur = AvroCursor::new(&data);
2202 dec.decode(&mut cur).unwrap();
2203 let arr = dec.flush(None).unwrap();
2204 let ua = arr
2205 .as_any()
2206 .downcast_ref::<UnionArray>()
2207 .expect("dense union output");
2208 assert_eq!(ua.len(), 1);
2209 assert_eq!(
2210 ua.type_id(0),
2211 1,
2212 "must resolve to reader 'long' branch (type_id 1)"
2213 );
2214 assert_eq!(ua.value_offset(0), 0);
2215 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2216 assert_eq!(long_child.len(), 1);
2217 assert_eq!(long_child.value(0), 6);
2218 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2219 assert_eq!(str_child.len(), 0, "string branch must be empty");
2220 }
2221
2222 #[test]
2223 fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2224 let ws = Schema::Union(vec![
2225 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2226 Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2227 ]);
2228 let rs = Schema::Union(vec![
2229 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2230 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2231 ]);
2232 let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2233 let mut dec = Decoder::try_new(field.data_type()).unwrap();
2234 let mut data = encode_avro_long(1);
2235 data.push(1);
2236 let mut cur = AvroCursor::new(&data);
2237 let res = dec.decode(&mut cur);
2238 assert!(
2239 res.is_err(),
2240 "expected error for unmapped writer 'boolean' branch"
2241 );
2242 }
2243
2244 #[test]
2245 fn test_schema_resolution_promotion_int_to_long() {
2246 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2247 assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2248 for v in [0, 1, -2, 123456] {
2249 let data = encode_avro_int(v);
2250 let mut cur = AvroCursor::new(&data);
2251 dec.decode(&mut cur).unwrap();
2252 }
2253 let arr = dec.flush(None).unwrap();
2254 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2255 assert_eq!(a.value(0), 0);
2256 assert_eq!(a.value(1), 1);
2257 assert_eq!(a.value(2), -2);
2258 assert_eq!(a.value(3), 123456);
2259 }
2260
2261 #[test]
2262 fn test_schema_resolution_promotion_int_to_float() {
2263 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2264 assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2265 for v in [0, 42, -7] {
2266 let data = encode_avro_int(v);
2267 let mut cur = AvroCursor::new(&data);
2268 dec.decode(&mut cur).unwrap();
2269 }
2270 let arr = dec.flush(None).unwrap();
2271 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2272 assert_eq!(a.value(0), 0.0);
2273 assert_eq!(a.value(1), 42.0);
2274 assert_eq!(a.value(2), -7.0);
2275 }
2276
2277 #[test]
2278 fn test_schema_resolution_promotion_int_to_double() {
2279 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2280 assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2281 for v in [1, -1, 10_000] {
2282 let data = encode_avro_int(v);
2283 let mut cur = AvroCursor::new(&data);
2284 dec.decode(&mut cur).unwrap();
2285 }
2286 let arr = dec.flush(None).unwrap();
2287 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2288 assert_eq!(a.value(0), 1.0);
2289 assert_eq!(a.value(1), -1.0);
2290 assert_eq!(a.value(2), 10_000.0);
2291 }
2292
2293 #[test]
2294 fn test_schema_resolution_promotion_long_to_float() {
2295 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2296 assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2297 for v in [0_i64, 1_000_000_i64, -123_i64] {
2298 let data = encode_avro_long(v);
2299 let mut cur = AvroCursor::new(&data);
2300 dec.decode(&mut cur).unwrap();
2301 }
2302 let arr = dec.flush(None).unwrap();
2303 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2304 assert_eq!(a.value(0), 0.0);
2305 assert_eq!(a.value(1), 1_000_000.0);
2306 assert_eq!(a.value(2), -123.0);
2307 }
2308
2309 #[test]
2310 fn test_schema_resolution_promotion_long_to_double() {
2311 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2312 assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2313 for v in [2_i64, -2_i64, 9_223_372_i64] {
2314 let data = encode_avro_long(v);
2315 let mut cur = AvroCursor::new(&data);
2316 dec.decode(&mut cur).unwrap();
2317 }
2318 let arr = dec.flush(None).unwrap();
2319 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2320 assert_eq!(a.value(0), 2.0);
2321 assert_eq!(a.value(1), -2.0);
2322 assert_eq!(a.value(2), 9_223_372.0);
2323 }
2324
2325 #[test]
2326 fn test_schema_resolution_promotion_float_to_double() {
2327 let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2328 assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2329 for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2330 let data = v.to_le_bytes().to_vec();
2331 let mut cur = AvroCursor::new(&data);
2332 dec.decode(&mut cur).unwrap();
2333 }
2334 let arr = dec.flush(None).unwrap();
2335 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2336 assert_eq!(a.value(0), 0.5_f64);
2337 assert_eq!(a.value(1), -3.25_f64);
2338 assert_eq!(a.value(2), 1.0e6_f64);
2339 }
2340
2341 #[test]
2342 fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2343 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2344 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2345 for s in ["hello", "world", "héllo"] {
2346 let data = encode_avro_bytes(s.as_bytes());
2347 let mut cur = AvroCursor::new(&data);
2348 dec.decode(&mut cur).unwrap();
2349 }
2350 let arr = dec.flush(None).unwrap();
2351 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2352 assert_eq!(a.value(0), "hello");
2353 assert_eq!(a.value(1), "world");
2354 assert_eq!(a.value(2), "héllo");
2355 }
2356
2357 #[test]
2358 fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2359 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2360 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2361 let data = encode_avro_bytes("abc".as_bytes());
2362 let mut cur = AvroCursor::new(&data);
2363 dec.decode(&mut cur).unwrap();
2364 let arr = dec.flush(None).unwrap();
2365 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2366 assert_eq!(a.value(0), "abc");
2367 }
2368
2369 #[test]
2370 fn test_schema_resolution_promotion_string_to_bytes() {
2371 let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2372 assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2373 for s in ["", "abc", "data"] {
2374 let data = encode_avro_bytes(s.as_bytes());
2375 let mut cur = AvroCursor::new(&data);
2376 dec.decode(&mut cur).unwrap();
2377 }
2378 let arr = dec.flush(None).unwrap();
2379 let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2380 assert_eq!(a.value(0), b"");
2381 assert_eq!(a.value(1), b"abc");
2382 assert_eq!(a.value(2), "data".as_bytes());
2383 }
2384
2385 #[test]
2386 fn test_schema_resolution_no_promotion_passthrough_int() {
2387 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2388 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2389 let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
2390 let mut dec = Decoder::try_new(field.data_type()).unwrap();
2391 assert!(matches!(dec, Decoder::Int32(_)));
2392 for v in [7, -9] {
2393 let data = encode_avro_int(v);
2394 let mut cur = AvroCursor::new(&data);
2395 dec.decode(&mut cur).unwrap();
2396 }
2397 let arr = dec.flush(None).unwrap();
2398 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2399 assert_eq!(a.value(0), 7);
2400 assert_eq!(a.value(1), -9);
2401 }
2402
2403 #[test]
2404 fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
2405 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2406 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
2407 let res = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false);
2408 assert!(res.is_err(), "expected error for illegal promotion");
2409 }
2410
2411 #[test]
2412 fn test_map_decoding_one_entry() {
2413 let value_type = avro_from_codec(Codec::Utf8);
2414 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2415 let mut decoder = Decoder::try_new(&map_type).unwrap();
2416 let mut data = Vec::new();
2418 data.extend_from_slice(&encode_avro_long(1));
2419 data.extend_from_slice(&encode_avro_bytes(b"hello")); data.extend_from_slice(&encode_avro_bytes(b"world")); data.extend_from_slice(&encode_avro_long(0));
2422 let mut cursor = AvroCursor::new(&data);
2423 decoder.decode(&mut cursor).unwrap();
2424 let array = decoder.flush(None).unwrap();
2425 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2426 assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 1);
2428 let entries = map_arr.value(0);
2429 let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
2430 assert_eq!(struct_entries.len(), 1);
2431 let key_arr = struct_entries
2432 .column_by_name("key")
2433 .unwrap()
2434 .as_any()
2435 .downcast_ref::<StringArray>()
2436 .unwrap();
2437 let val_arr = struct_entries
2438 .column_by_name("value")
2439 .unwrap()
2440 .as_any()
2441 .downcast_ref::<StringArray>()
2442 .unwrap();
2443 assert_eq!(key_arr.value(0), "hello");
2444 assert_eq!(val_arr.value(0), "world");
2445 }
2446
2447 #[test]
2448 fn test_map_decoding_empty() {
2449 let value_type = avro_from_codec(Codec::Utf8);
2450 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2451 let mut decoder = Decoder::try_new(&map_type).unwrap();
2452 let data = encode_avro_long(0);
2453 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2454 let array = decoder.flush(None).unwrap();
2455 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2456 assert_eq!(map_arr.len(), 1);
2457 assert_eq!(map_arr.value_length(0), 0);
2458 }
2459
2460 #[test]
2461 fn test_fixed_decoding() {
2462 let avro_type = avro_from_codec(Codec::Fixed(3));
2463 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2464
2465 let data1 = [1u8, 2, 3];
2466 let mut cursor1 = AvroCursor::new(&data1);
2467 decoder
2468 .decode(&mut cursor1)
2469 .expect("Failed to decode data1");
2470 assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
2471 let data2 = [4u8, 5, 6];
2472 let mut cursor2 = AvroCursor::new(&data2);
2473 decoder
2474 .decode(&mut cursor2)
2475 .expect("Failed to decode data2");
2476 assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
2477 let array = decoder.flush(None).expect("Failed to flush decoder");
2478 assert_eq!(array.len(), 2, "Array should contain two items");
2479 let fixed_size_binary_array = array
2480 .as_any()
2481 .downcast_ref::<FixedSizeBinaryArray>()
2482 .expect("Failed to downcast to FixedSizeBinaryArray");
2483 assert_eq!(
2484 fixed_size_binary_array.value_length(),
2485 3,
2486 "Fixed size of binary values should be 3"
2487 );
2488 assert_eq!(
2489 fixed_size_binary_array.value(0),
2490 &[1, 2, 3],
2491 "First item mismatch"
2492 );
2493 assert_eq!(
2494 fixed_size_binary_array.value(1),
2495 &[4, 5, 6],
2496 "Second item mismatch"
2497 );
2498 }
2499
2500 #[test]
2501 fn test_fixed_decoding_empty() {
2502 let avro_type = avro_from_codec(Codec::Fixed(5));
2503 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2504
2505 let array = decoder
2506 .flush(None)
2507 .expect("Failed to flush decoder for empty input");
2508
2509 assert_eq!(array.len(), 0, "Array should be empty");
2510 let fixed_size_binary_array = array
2511 .as_any()
2512 .downcast_ref::<FixedSizeBinaryArray>()
2513 .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
2514
2515 assert_eq!(
2516 fixed_size_binary_array.value_length(),
2517 5,
2518 "Fixed size of binary values should be 5 as per type"
2519 );
2520 }
2521
2522 #[test]
2523 fn test_uuid_decoding() {
2524 let avro_type = avro_from_codec(Codec::Uuid);
2525 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2526 let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
2527 let data = encode_avro_bytes(uuid_str.as_bytes());
2528 let mut cursor = AvroCursor::new(&data);
2529 decoder.decode(&mut cursor).expect("Failed to decode data");
2530 assert_eq!(
2531 cursor.position(),
2532 data.len(),
2533 "Cursor should advance by varint size + data size"
2534 );
2535 let array = decoder.flush(None).expect("Failed to flush decoder");
2536 let fixed_size_binary_array = array
2537 .as_any()
2538 .downcast_ref::<FixedSizeBinaryArray>()
2539 .expect("Array should be a FixedSizeBinaryArray");
2540 assert_eq!(fixed_size_binary_array.len(), 1);
2541 assert_eq!(fixed_size_binary_array.value_length(), 16);
2542 let expected_bytes = [
2543 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
2544 0x6b, 0xf6,
2545 ];
2546 assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
2547 }
2548
2549 #[test]
2550 fn test_array_decoding() {
2551 let item_dt = avro_from_codec(Codec::Int32);
2552 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2553 let mut decoder = Decoder::try_new(&list_dt).unwrap();
2554 let mut row1 = Vec::new();
2555 row1.extend_from_slice(&encode_avro_long(2));
2556 row1.extend_from_slice(&encode_avro_int(10));
2557 row1.extend_from_slice(&encode_avro_int(20));
2558 row1.extend_from_slice(&encode_avro_long(0));
2559 let row2 = encode_avro_long(0);
2560 let mut cursor = AvroCursor::new(&row1);
2561 decoder.decode(&mut cursor).unwrap();
2562 let mut cursor2 = AvroCursor::new(&row2);
2563 decoder.decode(&mut cursor2).unwrap();
2564 let array = decoder.flush(None).unwrap();
2565 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2566 assert_eq!(list_arr.len(), 2);
2567 let offsets = list_arr.value_offsets();
2568 assert_eq!(offsets, &[0, 2, 2]);
2569 let values = list_arr.values();
2570 let int_arr = values.as_primitive::<Int32Type>();
2571 assert_eq!(int_arr.len(), 2);
2572 assert_eq!(int_arr.value(0), 10);
2573 assert_eq!(int_arr.value(1), 20);
2574 }
2575
2576 #[test]
2577 fn test_array_decoding_with_negative_block_count() {
2578 let item_dt = avro_from_codec(Codec::Int32);
2579 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2580 let mut decoder = Decoder::try_new(&list_dt).unwrap();
2581 let mut data = encode_avro_long(-3);
2582 data.extend_from_slice(&encode_avro_long(12));
2583 data.extend_from_slice(&encode_avro_int(1));
2584 data.extend_from_slice(&encode_avro_int(2));
2585 data.extend_from_slice(&encode_avro_int(3));
2586 data.extend_from_slice(&encode_avro_long(0));
2587 let mut cursor = AvroCursor::new(&data);
2588 decoder.decode(&mut cursor).unwrap();
2589 let array = decoder.flush(None).unwrap();
2590 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2591 assert_eq!(list_arr.len(), 1);
2592 assert_eq!(list_arr.value_length(0), 3);
2593 let values = list_arr.values().as_primitive::<Int32Type>();
2594 assert_eq!(values.len(), 3);
2595 assert_eq!(values.value(0), 1);
2596 assert_eq!(values.value(1), 2);
2597 assert_eq!(values.value(2), 3);
2598 }
2599
2600 #[test]
2601 fn test_nested_array_decoding() {
2602 let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
2603 let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
2604 let mut decoder = Decoder::try_new(&nested_ty).unwrap();
2605 let mut buf = Vec::new();
2606 buf.extend(encode_avro_long(1));
2607 buf.extend(encode_avro_long(2));
2608 buf.extend(encode_avro_int(5));
2609 buf.extend(encode_avro_int(6));
2610 buf.extend(encode_avro_long(0));
2611 buf.extend(encode_avro_long(0));
2612 let mut cursor = AvroCursor::new(&buf);
2613 decoder.decode(&mut cursor).unwrap();
2614 let arr = decoder.flush(None).unwrap();
2615 let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
2616 assert_eq!(outer.len(), 1);
2617 assert_eq!(outer.value_length(0), 1);
2618 let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
2619 assert_eq!(inner.len(), 1);
2620 assert_eq!(inner.value_length(0), 2);
2621 let values = inner
2622 .values()
2623 .as_any()
2624 .downcast_ref::<Int32Array>()
2625 .unwrap();
2626 assert_eq!(values.values(), &[5, 6]);
2627 }
2628
2629 #[test]
2630 fn test_array_decoding_empty_array() {
2631 let value_type = avro_from_codec(Codec::Utf8);
2632 let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
2633 let mut decoder = Decoder::try_new(&map_type).unwrap();
2634 let data = encode_avro_long(0);
2635 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2636 let array = decoder.flush(None).unwrap();
2637 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2638 assert_eq!(list_arr.len(), 1);
2639 assert_eq!(list_arr.value_length(0), 0);
2640 }
2641
2642 #[test]
2643 fn test_decimal_decoding_fixed256() {
2644 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
2645 let mut decoder = Decoder::try_new(&dt).unwrap();
2646 let row1 = [
2647 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2648 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2649 0x00, 0x00, 0x30, 0x39,
2650 ];
2651 let row2 = [
2652 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2653 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2654 0xFF, 0xFF, 0xFF, 0x85,
2655 ];
2656 let mut data = Vec::new();
2657 data.extend_from_slice(&row1);
2658 data.extend_from_slice(&row2);
2659 let mut cursor = AvroCursor::new(&data);
2660 decoder.decode(&mut cursor).unwrap();
2661 decoder.decode(&mut cursor).unwrap();
2662 let arr = decoder.flush(None).unwrap();
2663 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
2664 assert_eq!(dec.len(), 2);
2665 assert_eq!(dec.value_as_string(0), "123.45");
2666 assert_eq!(dec.value_as_string(1), "-1.23");
2667 }
2668
2669 #[test]
2670 fn test_decimal_decoding_fixed128() {
2671 let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
2672 let mut decoder = Decoder::try_new(&dt).unwrap();
2673 let row1 = [
2674 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2675 0x30, 0x39,
2676 ];
2677 let row2 = [
2678 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2679 0xFF, 0x85,
2680 ];
2681 let mut data = Vec::new();
2682 data.extend_from_slice(&row1);
2683 data.extend_from_slice(&row2);
2684 let mut cursor = AvroCursor::new(&data);
2685 decoder.decode(&mut cursor).unwrap();
2686 decoder.decode(&mut cursor).unwrap();
2687 let arr = decoder.flush(None).unwrap();
2688 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2689 assert_eq!(dec.len(), 2);
2690 assert_eq!(dec.value_as_string(0), "123.45");
2691 assert_eq!(dec.value_as_string(1), "-1.23");
2692 }
2693
2694 #[test]
2695 fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
2696 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
2697 let mut decoder = Decoder::try_new(&dt).unwrap();
2698 let row1 = [
2699 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2700 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2701 0x00, 0x00, 0x30, 0x39,
2702 ];
2703 let row2 = [
2704 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2705 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2706 0xFF, 0xFF, 0xFF, 0x85,
2707 ];
2708 let mut data = Vec::new();
2709 data.extend_from_slice(&row1);
2710 data.extend_from_slice(&row2);
2711 let mut cursor = AvroCursor::new(&data);
2712 decoder.decode(&mut cursor).unwrap();
2713 decoder.decode(&mut cursor).unwrap();
2714 let arr = decoder.flush(None).unwrap();
2715 #[cfg(feature = "small_decimals")]
2716 {
2717 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2718 assert_eq!(dec.len(), 2);
2719 assert_eq!(dec.value_as_string(0), "123.45");
2720 assert_eq!(dec.value_as_string(1), "-1.23");
2721 }
2722 #[cfg(not(feature = "small_decimals"))]
2723 {
2724 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2725 assert_eq!(dec.len(), 2);
2726 assert_eq!(dec.value_as_string(0), "123.45");
2727 assert_eq!(dec.value_as_string(1), "-1.23");
2728 }
2729 }
2730
2731 #[test]
2732 fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
2733 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
2734 let mut decoder = Decoder::try_new(&dt).unwrap();
2735 let row1 = [
2736 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2737 0x30, 0x39,
2738 ];
2739 let row2 = [
2740 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2741 0xFF, 0x85,
2742 ];
2743 let mut data = Vec::new();
2744 data.extend_from_slice(&row1);
2745 data.extend_from_slice(&row2);
2746 let mut cursor = AvroCursor::new(&data);
2747 decoder.decode(&mut cursor).unwrap();
2748 decoder.decode(&mut cursor).unwrap();
2749
2750 let arr = decoder.flush(None).unwrap();
2751 #[cfg(feature = "small_decimals")]
2752 {
2753 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2754 assert_eq!(dec.len(), 2);
2755 assert_eq!(dec.value_as_string(0), "123.45");
2756 assert_eq!(dec.value_as_string(1), "-1.23");
2757 }
2758 #[cfg(not(feature = "small_decimals"))]
2759 {
2760 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2761 assert_eq!(dec.len(), 2);
2762 assert_eq!(dec.value_as_string(0), "123.45");
2763 assert_eq!(dec.value_as_string(1), "-1.23");
2764 }
2765 }
2766
2767 #[test]
2768 fn test_decimal_decoding_bytes_with_nulls() {
2769 let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
2770 let inner = Decoder::try_new(&dt).unwrap();
2771 let mut decoder = Decoder::Nullable(
2772 Nullability::NullSecond,
2773 NullBufferBuilder::new(DEFAULT_CAPACITY),
2774 Box::new(inner),
2775 NullablePlan::ReadTag,
2776 );
2777 let mut data = Vec::new();
2778 data.extend_from_slice(&encode_avro_int(0));
2779 data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
2780 data.extend_from_slice(&encode_avro_int(1));
2781 data.extend_from_slice(&encode_avro_int(0));
2782 data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
2783 let mut cursor = AvroCursor::new(&data);
2784 decoder.decode(&mut cursor).unwrap();
2785 decoder.decode(&mut cursor).unwrap();
2786 decoder.decode(&mut cursor).unwrap();
2787 let arr = decoder.flush(None).unwrap();
2788 #[cfg(feature = "small_decimals")]
2789 {
2790 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2791 assert_eq!(dec_arr.len(), 3);
2792 assert!(dec_arr.is_valid(0));
2793 assert!(!dec_arr.is_valid(1));
2794 assert!(dec_arr.is_valid(2));
2795 assert_eq!(dec_arr.value_as_string(0), "123.4");
2796 assert_eq!(dec_arr.value_as_string(2), "-123.4");
2797 }
2798 #[cfg(not(feature = "small_decimals"))]
2799 {
2800 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2801 assert_eq!(dec_arr.len(), 3);
2802 assert!(dec_arr.is_valid(0));
2803 assert!(!dec_arr.is_valid(1));
2804 assert!(dec_arr.is_valid(2));
2805 assert_eq!(dec_arr.value_as_string(0), "123.4");
2806 assert_eq!(dec_arr.value_as_string(2), "-123.4");
2807 }
2808 }
2809
2810 #[test]
2811 fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
2812 let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
2813 let inner = Decoder::try_new(&dt).unwrap();
2814 let mut decoder = Decoder::Nullable(
2815 Nullability::NullSecond,
2816 NullBufferBuilder::new(DEFAULT_CAPACITY),
2817 Box::new(inner),
2818 NullablePlan::ReadTag,
2819 );
2820 let row1 = [
2821 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
2822 0xE2, 0x40,
2823 ];
2824 let row3 = [
2825 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
2826 0x1D, 0xC0,
2827 ];
2828 let mut data = Vec::new();
2829 data.extend_from_slice(&encode_avro_int(0));
2830 data.extend_from_slice(&row1);
2831 data.extend_from_slice(&encode_avro_int(1));
2832 data.extend_from_slice(&encode_avro_int(0));
2833 data.extend_from_slice(&row3);
2834 let mut cursor = AvroCursor::new(&data);
2835 decoder.decode(&mut cursor).unwrap();
2836 decoder.decode(&mut cursor).unwrap();
2837 decoder.decode(&mut cursor).unwrap();
2838 let arr = decoder.flush(None).unwrap();
2839 #[cfg(feature = "small_decimals")]
2840 {
2841 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2842 assert_eq!(dec_arr.len(), 3);
2843 assert!(dec_arr.is_valid(0));
2844 assert!(!dec_arr.is_valid(1));
2845 assert!(dec_arr.is_valid(2));
2846 assert_eq!(dec_arr.value_as_string(0), "1234.56");
2847 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
2848 }
2849 #[cfg(not(feature = "small_decimals"))]
2850 {
2851 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2852 assert_eq!(dec_arr.len(), 3);
2853 assert!(dec_arr.is_valid(0));
2854 assert!(!dec_arr.is_valid(1));
2855 assert!(dec_arr.is_valid(2));
2856 assert_eq!(dec_arr.value_as_string(0), "1234.56");
2857 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
2858 }
2859 }
2860
2861 #[test]
2862 fn test_enum_decoding() {
2863 let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
2864 let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
2865 let mut decoder = Decoder::try_new(&avro_type).unwrap();
2866 let mut data = Vec::new();
2867 data.extend_from_slice(&encode_avro_int(2));
2868 data.extend_from_slice(&encode_avro_int(0));
2869 data.extend_from_slice(&encode_avro_int(1));
2870 let mut cursor = AvroCursor::new(&data);
2871 decoder.decode(&mut cursor).unwrap();
2872 decoder.decode(&mut cursor).unwrap();
2873 decoder.decode(&mut cursor).unwrap();
2874 let array = decoder.flush(None).unwrap();
2875 let dict_array = array
2876 .as_any()
2877 .downcast_ref::<DictionaryArray<Int32Type>>()
2878 .unwrap();
2879 assert_eq!(dict_array.len(), 3);
2880 let values = dict_array
2881 .values()
2882 .as_any()
2883 .downcast_ref::<StringArray>()
2884 .unwrap();
2885 assert_eq!(values.value(0), "A");
2886 assert_eq!(values.value(1), "B");
2887 assert_eq!(values.value(2), "C");
2888 assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
2889 }
2890
2891 #[test]
2892 fn test_enum_decoding_with_nulls() {
2893 let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
2894 let enum_codec = Codec::Enum(symbols.clone());
2895 let avro_type =
2896 AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
2897 let mut decoder = Decoder::try_new(&avro_type).unwrap();
2898 let mut data = Vec::new();
2899 data.extend_from_slice(&encode_avro_long(1));
2900 data.extend_from_slice(&encode_avro_int(1));
2901 data.extend_from_slice(&encode_avro_long(0));
2902 data.extend_from_slice(&encode_avro_long(1));
2903 data.extend_from_slice(&encode_avro_int(0));
2904 let mut cursor = AvroCursor::new(&data);
2905 decoder.decode(&mut cursor).unwrap();
2906 decoder.decode(&mut cursor).unwrap();
2907 decoder.decode(&mut cursor).unwrap();
2908 let array = decoder.flush(None).unwrap();
2909 let dict_array = array
2910 .as_any()
2911 .downcast_ref::<DictionaryArray<Int32Type>>()
2912 .unwrap();
2913 assert_eq!(dict_array.len(), 3);
2914 assert!(dict_array.is_valid(0));
2915 assert!(dict_array.is_null(1));
2916 assert!(dict_array.is_valid(2));
2917 let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
2918 assert_eq!(dict_array.keys(), &expected_keys);
2919 let values = dict_array
2920 .values()
2921 .as_any()
2922 .downcast_ref::<StringArray>()
2923 .unwrap();
2924 assert_eq!(values.value(0), "X");
2925 assert_eq!(values.value(1), "Y");
2926 }
2927
2928 #[test]
2929 fn test_duration_decoding_with_nulls() {
2930 let duration_codec = Codec::Interval;
2931 let avro_type = AvroDataType::new(
2932 duration_codec,
2933 Default::default(),
2934 Some(Nullability::NullFirst),
2935 );
2936 let mut decoder = Decoder::try_new(&avro_type).unwrap();
2937 let mut data = Vec::new();
2938 data.extend_from_slice(&encode_avro_long(1)); let mut duration1 = Vec::new();
2941 duration1.extend_from_slice(&1u32.to_le_bytes());
2942 duration1.extend_from_slice(&2u32.to_le_bytes());
2943 duration1.extend_from_slice(&3u32.to_le_bytes());
2944 data.extend_from_slice(&duration1);
2945 data.extend_from_slice(&encode_avro_long(0)); data.extend_from_slice(&encode_avro_long(1)); let mut duration2 = Vec::new();
2949 duration2.extend_from_slice(&4u32.to_le_bytes());
2950 duration2.extend_from_slice(&5u32.to_le_bytes());
2951 duration2.extend_from_slice(&6u32.to_le_bytes());
2952 data.extend_from_slice(&duration2);
2953 let mut cursor = AvroCursor::new(&data);
2954 decoder.decode(&mut cursor).unwrap();
2955 decoder.decode(&mut cursor).unwrap();
2956 decoder.decode(&mut cursor).unwrap();
2957 let array = decoder.flush(None).unwrap();
2958 let interval_array = array
2959 .as_any()
2960 .downcast_ref::<IntervalMonthDayNanoArray>()
2961 .unwrap();
2962 assert_eq!(interval_array.len(), 3);
2963 assert!(interval_array.is_valid(0));
2964 assert!(interval_array.is_null(1));
2965 assert!(interval_array.is_valid(2));
2966 let expected = IntervalMonthDayNanoArray::from(vec![
2967 Some(IntervalMonthDayNano {
2968 months: 1,
2969 days: 2,
2970 nanoseconds: 3_000_000,
2971 }),
2972 None,
2973 Some(IntervalMonthDayNano {
2974 months: 4,
2975 days: 5,
2976 nanoseconds: 6_000_000,
2977 }),
2978 ]);
2979 assert_eq!(interval_array, &expected);
2980 }
2981
2982 #[test]
2983 fn test_duration_decoding_empty() {
2984 let duration_codec = Codec::Interval;
2985 let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
2986 let mut decoder = Decoder::try_new(&avro_type).unwrap();
2987 let array = decoder.flush(None).unwrap();
2988 assert_eq!(array.len(), 0);
2989 }
2990
2991 #[test]
2992 #[cfg(feature = "avro_custom_types")]
2993 fn test_duration_seconds_decoding() {
2994 let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
2995 let mut decoder = Decoder::try_new(&avro_type).unwrap();
2996 let mut data = Vec::new();
2997 data.extend_from_slice(&encode_avro_long(0));
2999 data.extend_from_slice(&encode_avro_long(-1));
3000 data.extend_from_slice(&encode_avro_long(2));
3001 let mut cursor = AvroCursor::new(&data);
3002 decoder.decode(&mut cursor).unwrap();
3003 decoder.decode(&mut cursor).unwrap();
3004 decoder.decode(&mut cursor).unwrap();
3005 let array = decoder.flush(None).unwrap();
3006 let dur = array
3007 .as_any()
3008 .downcast_ref::<DurationSecondArray>()
3009 .unwrap();
3010 assert_eq!(dur.values(), &[0, -1, 2]);
3011 }
3012
3013 #[test]
3014 #[cfg(feature = "avro_custom_types")]
3015 fn test_duration_milliseconds_decoding() {
3016 let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3017 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3018 let mut data = Vec::new();
3019 for v in [1i64, 0, -2] {
3020 data.extend_from_slice(&encode_avro_long(v));
3021 }
3022 let mut cursor = AvroCursor::new(&data);
3023 for _ in 0..3 {
3024 decoder.decode(&mut cursor).unwrap();
3025 }
3026 let array = decoder.flush(None).unwrap();
3027 let dur = array
3028 .as_any()
3029 .downcast_ref::<DurationMillisecondArray>()
3030 .unwrap();
3031 assert_eq!(dur.values(), &[1, 0, -2]);
3032 }
3033
3034 #[test]
3035 #[cfg(feature = "avro_custom_types")]
3036 fn test_duration_microseconds_decoding() {
3037 let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3038 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3039 let mut data = Vec::new();
3040 for v in [5i64, -6, 7] {
3041 data.extend_from_slice(&encode_avro_long(v));
3042 }
3043 let mut cursor = AvroCursor::new(&data);
3044 for _ in 0..3 {
3045 decoder.decode(&mut cursor).unwrap();
3046 }
3047 let array = decoder.flush(None).unwrap();
3048 let dur = array
3049 .as_any()
3050 .downcast_ref::<DurationMicrosecondArray>()
3051 .unwrap();
3052 assert_eq!(dur.values(), &[5, -6, 7]);
3053 }
3054
3055 #[test]
3056 #[cfg(feature = "avro_custom_types")]
3057 fn test_duration_nanoseconds_decoding() {
3058 let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3059 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3060 let mut data = Vec::new();
3061 for v in [8i64, 9, -10] {
3062 data.extend_from_slice(&encode_avro_long(v));
3063 }
3064 let mut cursor = AvroCursor::new(&data);
3065 for _ in 0..3 {
3066 decoder.decode(&mut cursor).unwrap();
3067 }
3068 let array = decoder.flush(None).unwrap();
3069 let dur = array
3070 .as_any()
3071 .downcast_ref::<DurationNanosecondArray>()
3072 .unwrap();
3073 assert_eq!(dur.values(), &[8, 9, -10]);
3074 }
3075
3076 #[test]
3077 fn test_nullable_decode_error_bitmap_corruption() {
3078 let avro_type = AvroDataType::new(
3080 Codec::Int32,
3081 Default::default(),
3082 Some(Nullability::NullSecond),
3083 );
3084 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3085
3086 let mut row1 = Vec::new();
3088 row1.extend_from_slice(&encode_avro_int(1));
3089
3090 let mut row2 = Vec::new();
3092 row2.extend_from_slice(&encode_avro_int(0)); let mut row3 = Vec::new();
3096 row3.extend_from_slice(&encode_avro_int(0)); row3.extend_from_slice(&encode_avro_int(42)); decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3100 assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3102
3103 let array = decoder.flush(None).unwrap();
3104
3105 assert_eq!(array.len(), 2);
3107 let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3108 assert!(int_array.is_null(0)); assert_eq!(int_array.value(1), 42); }
3111
3112 #[test]
3113 fn test_enum_mapping_reordered_symbols() {
3114 let reader_symbols: Arc<[String]> =
3115 vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3116 let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3117 let default_index: i32 = -1;
3118 let mut dec = Decoder::Enum(
3119 Vec::with_capacity(DEFAULT_CAPACITY),
3120 reader_symbols.clone(),
3121 Some(EnumResolution {
3122 mapping,
3123 default_index,
3124 }),
3125 );
3126 let mut data = Vec::new();
3127 data.extend_from_slice(&encode_avro_int(0));
3128 data.extend_from_slice(&encode_avro_int(1));
3129 data.extend_from_slice(&encode_avro_int(2));
3130 let mut cur = AvroCursor::new(&data);
3131 dec.decode(&mut cur).unwrap();
3132 dec.decode(&mut cur).unwrap();
3133 dec.decode(&mut cur).unwrap();
3134 let arr = dec.flush(None).unwrap();
3135 let dict = arr
3136 .as_any()
3137 .downcast_ref::<DictionaryArray<Int32Type>>()
3138 .unwrap();
3139 let expected_keys = Int32Array::from(vec![2, 0, 1]);
3140 assert_eq!(dict.keys(), &expected_keys);
3141 let values = dict
3142 .values()
3143 .as_any()
3144 .downcast_ref::<StringArray>()
3145 .unwrap();
3146 assert_eq!(values.value(0), "B");
3147 assert_eq!(values.value(1), "C");
3148 assert_eq!(values.value(2), "A");
3149 }
3150
3151 #[test]
3152 fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3153 let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3154 let default_index: i32 = 1;
3155 let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3156 let mut dec = Decoder::Enum(
3157 Vec::with_capacity(DEFAULT_CAPACITY),
3158 reader_symbols.clone(),
3159 Some(EnumResolution {
3160 mapping,
3161 default_index,
3162 }),
3163 );
3164 let mut data = Vec::new();
3165 data.extend_from_slice(&encode_avro_int(0));
3166 data.extend_from_slice(&encode_avro_int(1));
3167 data.extend_from_slice(&encode_avro_int(99));
3168 let mut cur = AvroCursor::new(&data);
3169 dec.decode(&mut cur).unwrap();
3170 dec.decode(&mut cur).unwrap();
3171 dec.decode(&mut cur).unwrap();
3172 let arr = dec.flush(None).unwrap();
3173 let dict = arr
3174 .as_any()
3175 .downcast_ref::<DictionaryArray<Int32Type>>()
3176 .unwrap();
3177 let expected_keys = Int32Array::from(vec![0, 1, 1]);
3178 assert_eq!(dict.keys(), &expected_keys);
3179 let values = dict
3180 .values()
3181 .as_any()
3182 .downcast_ref::<StringArray>()
3183 .unwrap();
3184 assert_eq!(values.value(0), "A");
3185 assert_eq!(values.value(1), "B");
3186 }
3187
3188 #[test]
3189 fn test_enum_mapping_unknown_symbol_without_default_errors() {
3190 let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3191 let default_index: i32 = -1; let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3193 let mut dec = Decoder::Enum(
3194 Vec::with_capacity(DEFAULT_CAPACITY),
3195 reader_symbols,
3196 Some(EnumResolution {
3197 mapping,
3198 default_index,
3199 }),
3200 );
3201 let data = encode_avro_int(0);
3202 let mut cur = AvroCursor::new(&data);
3203 let err = dec
3204 .decode(&mut cur)
3205 .expect_err("expected decode error for unresolved enum without default");
3206 let msg = err.to_string();
3207 assert!(
3208 msg.contains("not resolvable") && msg.contains("no default"),
3209 "unexpected error message: {msg}"
3210 );
3211 }
3212
3213 fn make_record_resolved_decoder(
3214 reader_fields: &[(&str, DataType, bool)],
3215 writer_to_reader: Vec<Option<usize>>,
3216 skip_decoders: Vec<Option<Skipper>>,
3217 ) -> Decoder {
3218 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3219 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3220 for (name, dt, nullable) in reader_fields {
3221 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3222 let enc = match dt {
3223 DataType::Int32 => Decoder::Int32(Vec::new()),
3224 DataType::Int64 => Decoder::Int64(Vec::new()),
3225 DataType::Utf8 => {
3226 Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3227 }
3228 other => panic!("Unsupported test reader field type: {other:?}"),
3229 };
3230 encodings.push(enc);
3231 }
3232 let fields: Fields = field_refs.into();
3233 Decoder::Record(
3234 fields,
3235 encodings,
3236 Some(Projector {
3237 writer_to_reader: Arc::from(writer_to_reader),
3238 skip_decoders,
3239 field_defaults: vec![None; reader_fields.len()],
3240 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3241 }),
3242 )
3243 }
3244
3245 #[test]
3246 fn test_skip_writer_trailing_field_int32() {
3247 let mut dec = make_record_resolved_decoder(
3248 &[("id", arrow_schema::DataType::Int32, false)],
3249 vec![Some(0), None],
3250 vec![None, Some(super::Skipper::Int32)],
3251 );
3252 let mut data = Vec::new();
3253 data.extend_from_slice(&encode_avro_int(7));
3254 data.extend_from_slice(&encode_avro_int(999));
3255 let mut cur = AvroCursor::new(&data);
3256 dec.decode(&mut cur).unwrap();
3257 assert_eq!(cur.position(), data.len());
3258 let arr = dec.flush(None).unwrap();
3259 let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
3260 assert_eq!(struct_arr.len(), 1);
3261 let id = struct_arr
3262 .column_by_name("id")
3263 .unwrap()
3264 .as_any()
3265 .downcast_ref::<Int32Array>()
3266 .unwrap();
3267 assert_eq!(id.value(0), 7);
3268 }
3269
3270 #[test]
3271 fn test_skip_writer_middle_field_string() {
3272 let mut dec = make_record_resolved_decoder(
3273 &[
3274 ("id", DataType::Int32, false),
3275 ("score", DataType::Int64, false),
3276 ],
3277 vec![Some(0), None, Some(1)],
3278 vec![None, Some(Skipper::String), None],
3279 );
3280 let mut data = Vec::new();
3281 data.extend_from_slice(&encode_avro_int(42));
3282 data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
3283 data.extend_from_slice(&encode_avro_long(1000));
3284 let mut cur = AvroCursor::new(&data);
3285 dec.decode(&mut cur).unwrap();
3286 assert_eq!(cur.position(), data.len());
3287 let arr = dec.flush(None).unwrap();
3288 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3289 let id = s
3290 .column_by_name("id")
3291 .unwrap()
3292 .as_any()
3293 .downcast_ref::<Int32Array>()
3294 .unwrap();
3295 let score = s
3296 .column_by_name("score")
3297 .unwrap()
3298 .as_any()
3299 .downcast_ref::<Int64Array>()
3300 .unwrap();
3301 assert_eq!(id.value(0), 42);
3302 assert_eq!(score.value(0), 1000);
3303 }
3304
3305 #[test]
3306 fn test_skip_writer_array_with_negative_block_count_fast() {
3307 let mut dec = make_record_resolved_decoder(
3308 &[("id", DataType::Int32, false)],
3309 vec![None, Some(0)],
3310 vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
3311 );
3312 let mut array_payload = Vec::new();
3313 array_payload.extend_from_slice(&encode_avro_int(1));
3314 array_payload.extend_from_slice(&encode_avro_int(2));
3315 array_payload.extend_from_slice(&encode_avro_int(3));
3316 let mut data = Vec::new();
3317 data.extend_from_slice(&encode_avro_long(-3));
3318 data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
3319 data.extend_from_slice(&array_payload);
3320 data.extend_from_slice(&encode_avro_long(0));
3321 data.extend_from_slice(&encode_avro_int(5));
3322 let mut cur = AvroCursor::new(&data);
3323 dec.decode(&mut cur).unwrap();
3324 assert_eq!(cur.position(), data.len());
3325 let arr = dec.flush(None).unwrap();
3326 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3327 let id = s
3328 .column_by_name("id")
3329 .unwrap()
3330 .as_any()
3331 .downcast_ref::<Int32Array>()
3332 .unwrap();
3333 assert_eq!(id.len(), 1);
3334 assert_eq!(id.value(0), 5);
3335 }
3336
3337 #[test]
3338 fn test_skip_writer_map_with_negative_block_count_fast() {
3339 let mut dec = make_record_resolved_decoder(
3340 &[("id", DataType::Int32, false)],
3341 vec![None, Some(0)],
3342 vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
3343 );
3344 let mut entries = Vec::new();
3345 entries.extend_from_slice(&encode_avro_bytes(b"k1"));
3346 entries.extend_from_slice(&encode_avro_int(10));
3347 entries.extend_from_slice(&encode_avro_bytes(b"k2"));
3348 entries.extend_from_slice(&encode_avro_int(20));
3349 let mut data = Vec::new();
3350 data.extend_from_slice(&encode_avro_long(-2));
3351 data.extend_from_slice(&encode_avro_long(entries.len() as i64));
3352 data.extend_from_slice(&entries);
3353 data.extend_from_slice(&encode_avro_long(0));
3354 data.extend_from_slice(&encode_avro_int(123));
3355 let mut cur = AvroCursor::new(&data);
3356 dec.decode(&mut cur).unwrap();
3357 assert_eq!(cur.position(), data.len());
3358 let arr = dec.flush(None).unwrap();
3359 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3360 let id = s
3361 .column_by_name("id")
3362 .unwrap()
3363 .as_any()
3364 .downcast_ref::<Int32Array>()
3365 .unwrap();
3366 assert_eq!(id.len(), 1);
3367 assert_eq!(id.value(0), 123);
3368 }
3369
3370 #[test]
3371 fn test_skip_writer_nullable_field_union_nullfirst() {
3372 let mut dec = make_record_resolved_decoder(
3373 &[("id", DataType::Int32, false)],
3374 vec![None, Some(0)],
3375 vec![
3376 Some(super::Skipper::Nullable(
3377 Nullability::NullFirst,
3378 Box::new(super::Skipper::Int32),
3379 )),
3380 None,
3381 ],
3382 );
3383 let mut row1 = Vec::new();
3384 row1.extend_from_slice(&encode_avro_long(0));
3385 row1.extend_from_slice(&encode_avro_int(5));
3386 let mut row2 = Vec::new();
3387 row2.extend_from_slice(&encode_avro_long(1));
3388 row2.extend_from_slice(&encode_avro_int(123));
3389 row2.extend_from_slice(&encode_avro_int(7));
3390 let mut cur1 = AvroCursor::new(&row1);
3391 let mut cur2 = AvroCursor::new(&row2);
3392 dec.decode(&mut cur1).unwrap();
3393 dec.decode(&mut cur2).unwrap();
3394 assert_eq!(cur1.position(), row1.len());
3395 assert_eq!(cur2.position(), row2.len());
3396 let arr = dec.flush(None).unwrap();
3397 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3398 let id = s
3399 .column_by_name("id")
3400 .unwrap()
3401 .as_any()
3402 .downcast_ref::<Int32Array>()
3403 .unwrap();
3404 assert_eq!(id.len(), 2);
3405 assert_eq!(id.value(0), 5);
3406 assert_eq!(id.value(1), 7);
3407 }
3408
3409 fn make_dense_union_avro(
3410 children: Vec<(Codec, &'_ str, DataType)>,
3411 type_ids: Vec<i8>,
3412 ) -> AvroDataType {
3413 let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
3414 let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
3415 for (codec, name, dt) in children.into_iter() {
3416 avro_children.push(AvroDataType::new(codec, Default::default(), None));
3417 fields.push(arrow_schema::Field::new(name, dt, true));
3418 }
3419 let union_fields = UnionFields::new(type_ids, fields);
3420 let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
3421 AvroDataType::new(union_codec, Default::default(), None)
3422 }
3423
3424 #[test]
3425 fn test_union_dense_two_children_custom_type_ids() {
3426 let union_dt = make_dense_union_avro(
3427 vec![
3428 (Codec::Int32, "i", DataType::Int32),
3429 (Codec::Utf8, "s", DataType::Utf8),
3430 ],
3431 vec![2, 5],
3432 );
3433 let mut dec = Decoder::try_new(&union_dt).unwrap();
3434 let mut r1 = Vec::new();
3435 r1.extend_from_slice(&encode_avro_long(0));
3436 r1.extend_from_slice(&encode_avro_int(7));
3437 let mut r2 = Vec::new();
3438 r2.extend_from_slice(&encode_avro_long(1));
3439 r2.extend_from_slice(&encode_avro_bytes(b"x"));
3440 let mut r3 = Vec::new();
3441 r3.extend_from_slice(&encode_avro_long(0));
3442 r3.extend_from_slice(&encode_avro_int(-1));
3443 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3444 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3445 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3446 let array = dec.flush(None).unwrap();
3447 let ua = array
3448 .as_any()
3449 .downcast_ref::<UnionArray>()
3450 .expect("expected UnionArray");
3451 assert_eq!(ua.len(), 3);
3452 assert_eq!(ua.type_id(0), 2);
3453 assert_eq!(ua.type_id(1), 5);
3454 assert_eq!(ua.type_id(2), 2);
3455 assert_eq!(ua.value_offset(0), 0);
3456 assert_eq!(ua.value_offset(1), 0);
3457 assert_eq!(ua.value_offset(2), 1);
3458 let int_child = ua
3459 .child(2)
3460 .as_any()
3461 .downcast_ref::<Int32Array>()
3462 .expect("int child");
3463 assert_eq!(int_child.len(), 2);
3464 assert_eq!(int_child.value(0), 7);
3465 assert_eq!(int_child.value(1), -1);
3466 let str_child = ua
3467 .child(5)
3468 .as_any()
3469 .downcast_ref::<StringArray>()
3470 .expect("string child");
3471 assert_eq!(str_child.len(), 1);
3472 assert_eq!(str_child.value(0), "x");
3473 }
3474
3475 #[test]
3476 fn test_union_dense_with_null_and_string_children() {
3477 let union_dt = make_dense_union_avro(
3478 vec![
3479 (Codec::Null, "n", DataType::Null),
3480 (Codec::Utf8, "s", DataType::Utf8),
3481 ],
3482 vec![42, 7],
3483 );
3484 let mut dec = Decoder::try_new(&union_dt).unwrap();
3485 let r1 = encode_avro_long(0);
3486 let mut r2 = Vec::new();
3487 r2.extend_from_slice(&encode_avro_long(1));
3488 r2.extend_from_slice(&encode_avro_bytes(b"abc"));
3489 let r3 = encode_avro_long(0);
3490 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3491 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3492 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3493 let array = dec.flush(None).unwrap();
3494 let ua = array
3495 .as_any()
3496 .downcast_ref::<UnionArray>()
3497 .expect("expected UnionArray");
3498 assert_eq!(ua.len(), 3);
3499 assert_eq!(ua.type_id(0), 42);
3500 assert_eq!(ua.type_id(1), 7);
3501 assert_eq!(ua.type_id(2), 42);
3502 assert_eq!(ua.value_offset(0), 0);
3503 assert_eq!(ua.value_offset(1), 0);
3504 assert_eq!(ua.value_offset(2), 1);
3505 let null_child = ua
3506 .child(42)
3507 .as_any()
3508 .downcast_ref::<NullArray>()
3509 .expect("null child");
3510 assert_eq!(null_child.len(), 2);
3511 let str_child = ua
3512 .child(7)
3513 .as_any()
3514 .downcast_ref::<StringArray>()
3515 .expect("string child");
3516 assert_eq!(str_child.len(), 1);
3517 assert_eq!(str_child.value(0), "abc");
3518 }
3519
3520 #[test]
3521 fn test_union_decode_negative_branch_index_errors() {
3522 let union_dt = make_dense_union_avro(
3523 vec![
3524 (Codec::Int32, "i", DataType::Int32),
3525 (Codec::Utf8, "s", DataType::Utf8),
3526 ],
3527 vec![0, 1],
3528 );
3529 let mut dec = Decoder::try_new(&union_dt).unwrap();
3530 let row = encode_avro_long(-1); let err = dec
3532 .decode(&mut AvroCursor::new(&row))
3533 .expect_err("expected error for negative branch index");
3534 let msg = err.to_string();
3535 assert!(
3536 msg.contains("Negative union branch index"),
3537 "unexpected error message: {msg}"
3538 );
3539 }
3540
3541 #[test]
3542 fn test_union_decode_out_of_range_branch_index_errors() {
3543 let union_dt = make_dense_union_avro(
3544 vec![
3545 (Codec::Int32, "i", DataType::Int32),
3546 (Codec::Utf8, "s", DataType::Utf8),
3547 ],
3548 vec![10, 11],
3549 );
3550 let mut dec = Decoder::try_new(&union_dt).unwrap();
3551 let row = encode_avro_long(2);
3552 let err = dec
3553 .decode(&mut AvroCursor::new(&row))
3554 .expect_err("expected error for out-of-range branch index");
3555 let msg = err.to_string();
3556 assert!(
3557 msg.contains("out of range"),
3558 "unexpected error message: {msg}"
3559 );
3560 }
3561
3562 #[test]
3563 fn test_union_sparse_mode_not_supported() {
3564 let children: Vec<AvroDataType> = vec![
3565 AvroDataType::new(Codec::Int32, Default::default(), None),
3566 AvroDataType::new(Codec::Utf8, Default::default(), None),
3567 ];
3568 let uf = UnionFields::new(
3569 vec![1, 3],
3570 vec![
3571 arrow_schema::Field::new("i", DataType::Int32, true),
3572 arrow_schema::Field::new("s", DataType::Utf8, true),
3573 ],
3574 );
3575 let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
3576 let dt = AvroDataType::new(codec, Default::default(), None);
3577 let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
3578 let msg = err.to_string();
3579 assert!(
3580 msg.contains("Sparse Arrow unions are not yet supported"),
3581 "unexpected error message: {msg}"
3582 );
3583 }
3584
3585 fn make_record_decoder_with_projector_defaults(
3586 reader_fields: &[(&str, DataType, bool)],
3587 field_defaults: Vec<Option<AvroLiteral>>,
3588 default_injections: Vec<(usize, AvroLiteral)>,
3589 writer_to_reader_len: usize,
3590 ) -> Decoder {
3591 assert_eq!(
3592 field_defaults.len(),
3593 reader_fields.len(),
3594 "field_defaults must have one entry per reader field"
3595 );
3596 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3597 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3598 for (name, dt, nullable) in reader_fields {
3599 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3600 let enc = match dt {
3601 DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
3602 DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
3603 DataType::Utf8 => Decoder::String(
3604 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3605 Vec::with_capacity(DEFAULT_CAPACITY),
3606 ),
3607 other => panic!("Unsupported test field type in helper: {other:?}"),
3608 };
3609 encodings.push(enc);
3610 }
3611 let fields: Fields = field_refs.into();
3612 let skip_decoders: Vec<Option<Skipper>> =
3613 (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
3614 let projector = Projector {
3615 writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
3616 skip_decoders,
3617 field_defaults,
3618 default_injections: Arc::from(default_injections),
3619 };
3620 Decoder::Record(fields, encodings, Some(projector))
3621 }
3622
3623 #[test]
3624 fn test_default_append_int32_and_int64_from_int_and_long() {
3625 let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3626 d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
3627 let arr = d_i32.flush(None).unwrap();
3628 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3629 assert_eq!(a.len(), 1);
3630 assert_eq!(a.value(0), 42);
3631 let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
3632 d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
3633 d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
3634 let arr64 = d_i64.flush(None).unwrap();
3635 let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
3636 assert_eq!(a64.len(), 2);
3637 assert_eq!(a64.value(0), 5);
3638 assert_eq!(a64.value(1), 7);
3639 }
3640
3641 #[test]
3642 fn test_default_append_floats_and_doubles() {
3643 let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
3644 d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
3645 let arr32 = d_f32.flush(None).unwrap();
3646 let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
3647 assert_eq!(a.value(0), 1.5);
3648 let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
3649 d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
3650 let arr64 = d_f64.flush(None).unwrap();
3651 let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
3652 assert_eq!(b.value(0), 2.25);
3653 }
3654
3655 #[test]
3656 fn test_default_append_string_and_bytes() {
3657 let mut d_str = Decoder::String(
3658 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3659 Vec::with_capacity(DEFAULT_CAPACITY),
3660 );
3661 d_str
3662 .append_default(&AvroLiteral::String("hi".into()))
3663 .unwrap();
3664 let s_arr = d_str.flush(None).unwrap();
3665 let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
3666 assert_eq!(arr.value(0), "hi");
3667 let mut d_bytes = Decoder::Binary(
3668 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3669 Vec::with_capacity(DEFAULT_CAPACITY),
3670 );
3671 d_bytes
3672 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3673 .unwrap();
3674 let b_arr = d_bytes.flush(None).unwrap();
3675 let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3676 assert_eq!(barr.value(0), &[1, 2, 3]);
3677 let mut d_str_err = Decoder::String(
3678 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3679 Vec::with_capacity(DEFAULT_CAPACITY),
3680 );
3681 let err = d_str_err
3682 .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
3683 .unwrap_err();
3684 assert!(
3685 err.to_string()
3686 .contains("Default for string must be string"),
3687 "unexpected error: {err:?}"
3688 );
3689 }
3690
3691 #[test]
3692 fn test_default_append_nullable_int32_null_and_value() {
3693 let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3694 let mut dec = Decoder::Nullable(
3695 Nullability::NullFirst,
3696 NullBufferBuilder::new(DEFAULT_CAPACITY),
3697 Box::new(inner),
3698 NullablePlan::ReadTag,
3699 );
3700 dec.append_default(&AvroLiteral::Null).unwrap();
3701 dec.append_default(&AvroLiteral::Int(11)).unwrap();
3702 let arr = dec.flush(None).unwrap();
3703 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3704 assert_eq!(a.len(), 2);
3705 assert!(a.is_null(0));
3706 assert_eq!(a.value(1), 11);
3707 }
3708
3709 #[test]
3710 fn test_default_append_array_of_ints() {
3711 let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3712 let mut d = Decoder::try_new(&list_dt).unwrap();
3713 let items = vec![
3714 AvroLiteral::Int(1),
3715 AvroLiteral::Int(2),
3716 AvroLiteral::Int(3),
3717 ];
3718 d.append_default(&AvroLiteral::Array(items)).unwrap();
3719 let arr = d.flush(None).unwrap();
3720 let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
3721 assert_eq!(list.len(), 1);
3722 assert_eq!(list.value_length(0), 3);
3723 let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
3724 assert_eq!(vals.values(), &[1, 2, 3]);
3725 }
3726
3727 #[test]
3728 fn test_default_append_map_string_to_int() {
3729 let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
3730 let mut d = Decoder::try_new(&map_dt).unwrap();
3731 let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
3732 m.insert("k1".to_string(), AvroLiteral::Int(10));
3733 m.insert("k2".to_string(), AvroLiteral::Int(20));
3734 d.append_default(&AvroLiteral::Map(m)).unwrap();
3735 let arr = d.flush(None).unwrap();
3736 let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
3737 assert_eq!(map.len(), 1);
3738 assert_eq!(map.value_length(0), 2);
3739 let binding = map.value(0);
3740 let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
3741 let k = entries
3742 .column_by_name("key")
3743 .unwrap()
3744 .as_any()
3745 .downcast_ref::<StringArray>()
3746 .unwrap();
3747 let v = entries
3748 .column_by_name("value")
3749 .unwrap()
3750 .as_any()
3751 .downcast_ref::<Int32Array>()
3752 .unwrap();
3753 let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
3754 assert_eq!(keys, ["k1", "k2"].into_iter().collect());
3755 let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
3756 assert_eq!(vals, [10, 20].into_iter().collect());
3757 }
3758
3759 #[test]
3760 fn test_default_append_enum_by_symbol() {
3761 let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
3762 let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
3763 d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
3764 let arr = d.flush(None).unwrap();
3765 let dict = arr
3766 .as_any()
3767 .downcast_ref::<DictionaryArray<Int32Type>>()
3768 .unwrap();
3769 assert_eq!(dict.len(), 1);
3770 let expected = Int32Array::from(vec![1]);
3771 assert_eq!(dict.keys(), &expected);
3772 let values = dict
3773 .values()
3774 .as_any()
3775 .downcast_ref::<StringArray>()
3776 .unwrap();
3777 assert_eq!(values.value(1), "B");
3778 }
3779
3780 #[test]
3781 fn test_default_append_uuid_and_type_error() {
3782 let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
3783 let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
3784 d.append_default(&AvroLiteral::String(uuid_str.into()))
3785 .unwrap();
3786 let arr_ref = d.flush(None).unwrap();
3787 let arr = arr_ref
3788 .as_any()
3789 .downcast_ref::<FixedSizeBinaryArray>()
3790 .unwrap();
3791 assert_eq!(arr.value_length(), 16);
3792 assert_eq!(arr.len(), 1);
3793 let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
3794 let err = d2
3795 .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
3796 .unwrap_err();
3797 assert!(
3798 err.to_string().contains("Default for uuid must be string"),
3799 "unexpected error: {err:?}"
3800 );
3801 }
3802
3803 #[test]
3804 fn test_default_append_fixed_and_length_mismatch() {
3805 let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
3806 d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
3807 .unwrap();
3808 let arr_ref = d.flush(None).unwrap();
3809 let arr = arr_ref
3810 .as_any()
3811 .downcast_ref::<FixedSizeBinaryArray>()
3812 .unwrap();
3813 assert_eq!(arr.value_length(), 4);
3814 assert_eq!(arr.value(0), &[1, 2, 3, 4]);
3815 let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
3816 let err = d_err
3817 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3818 .unwrap_err();
3819 assert!(
3820 err.to_string().contains("Fixed default length"),
3821 "unexpected error: {err:?}"
3822 );
3823 }
3824
3825 #[test]
3826 fn test_default_append_duration_and_length_validation() {
3827 let dt = avro_from_codec(Codec::Interval);
3828 let mut d = Decoder::try_new(&dt).unwrap();
3829 let mut bytes = Vec::with_capacity(12);
3830 bytes.extend_from_slice(&1u32.to_le_bytes());
3831 bytes.extend_from_slice(&2u32.to_le_bytes());
3832 bytes.extend_from_slice(&3u32.to_le_bytes());
3833 d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
3834 let arr_ref = d.flush(None).unwrap();
3835 let arr = arr_ref
3836 .as_any()
3837 .downcast_ref::<IntervalMonthDayNanoArray>()
3838 .unwrap();
3839 assert_eq!(arr.len(), 1);
3840 let v = arr.value(0);
3841 assert_eq!(v.months, 1);
3842 assert_eq!(v.days, 2);
3843 assert_eq!(v.nanoseconds, 3_000_000);
3844 let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
3845 let err = d_err
3846 .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
3847 .unwrap_err();
3848 assert!(
3849 err.to_string()
3850 .contains("Duration default must be exactly 12 bytes"),
3851 "unexpected error: {err:?}"
3852 );
3853 }
3854
3855 #[test]
3856 fn test_default_append_decimal256_from_bytes() {
3857 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
3858 let mut d = Decoder::try_new(&dt).unwrap();
3859 let pos: [u8; 32] = [
3860 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3861 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
3862 0x00, 0x00, 0x30, 0x39,
3863 ];
3864 d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
3865 let neg: [u8; 32] = [
3866 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3867 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3868 0xFF, 0xFF, 0xFF, 0x85,
3869 ];
3870 d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
3871 let arr = d.flush(None).unwrap();
3872 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
3873 assert_eq!(dec.len(), 2);
3874 assert_eq!(dec.value_as_string(0), "123.45");
3875 assert_eq!(dec.value_as_string(1), "-1.23");
3876 }
3877
3878 #[test]
3879 fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
3880 let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
3881 let mut rec = make_record_decoder_with_projector_defaults(
3882 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
3883 field_defaults,
3884 vec![],
3885 0,
3886 );
3887 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
3888 map.insert("a".to_string(), AvroLiteral::Int(7));
3889 rec.append_default(&AvroLiteral::Map(map)).unwrap();
3890 let arr = rec.flush(None).unwrap();
3891 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3892 let a = s
3893 .column_by_name("a")
3894 .unwrap()
3895 .as_any()
3896 .downcast_ref::<Int32Array>()
3897 .unwrap();
3898 let b = s
3899 .column_by_name("b")
3900 .unwrap()
3901 .as_any()
3902 .downcast_ref::<StringArray>()
3903 .unwrap();
3904 assert_eq!(a.value(0), 7);
3905 assert_eq!(b.value(0), "hi");
3906 }
3907
3908 #[test]
3909 fn test_record_append_default_null_uses_projector_field_defaults() {
3910 let field_defaults = vec![
3911 Some(AvroLiteral::Int(5)),
3912 Some(AvroLiteral::String("x".into())),
3913 ];
3914 let mut rec = make_record_decoder_with_projector_defaults(
3915 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
3916 field_defaults,
3917 vec![],
3918 0,
3919 );
3920 rec.append_default(&AvroLiteral::Null).unwrap();
3921 let arr = rec.flush(None).unwrap();
3922 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3923 let a = s
3924 .column_by_name("a")
3925 .unwrap()
3926 .as_any()
3927 .downcast_ref::<Int32Array>()
3928 .unwrap();
3929 let b = s
3930 .column_by_name("b")
3931 .unwrap()
3932 .as_any()
3933 .downcast_ref::<StringArray>()
3934 .unwrap();
3935 assert_eq!(a.value(0), 5);
3936 assert_eq!(b.value(0), "x");
3937 }
3938
3939 #[test]
3940 fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties(
3941 ) {
3942 let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
3943 let mut field_refs: Vec<FieldRef> = Vec::new();
3944 let mut encoders: Vec<Decoder> = Vec::new();
3945 for (name, dt, nullable) in &fields {
3946 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3947 }
3948 let enc_a = Decoder::Nullable(
3949 Nullability::NullSecond,
3950 NullBufferBuilder::new(DEFAULT_CAPACITY),
3951 Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
3952 NullablePlan::ReadTag,
3953 );
3954 let enc_b = Decoder::Nullable(
3955 Nullability::NullSecond,
3956 NullBufferBuilder::new(DEFAULT_CAPACITY),
3957 Box::new(Decoder::String(
3958 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3959 Vec::with_capacity(DEFAULT_CAPACITY),
3960 )),
3961 NullablePlan::ReadTag,
3962 );
3963 encoders.push(enc_a);
3964 encoders.push(enc_b);
3965 let projector = Projector {
3966 writer_to_reader: Arc::from(vec![]),
3967 skip_decoders: vec![],
3968 field_defaults: vec![None, None], default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3970 };
3971 let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
3972 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
3973 map.insert("a".to_string(), AvroLiteral::Int(9));
3974 rec.append_default(&AvroLiteral::Map(map)).unwrap();
3975 let arr = rec.flush(None).unwrap();
3976 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3977 let a = s
3978 .column_by_name("a")
3979 .unwrap()
3980 .as_any()
3981 .downcast_ref::<Int32Array>()
3982 .unwrap();
3983 let b = s
3984 .column_by_name("b")
3985 .unwrap()
3986 .as_any()
3987 .downcast_ref::<StringArray>()
3988 .unwrap();
3989 assert!(a.is_valid(0));
3990 assert_eq!(a.value(0), 9);
3991 assert!(b.is_null(0));
3992 }
3993
3994 #[test]
3995 fn test_projector_default_injection_when_writer_lacks_fields() {
3996 let defaults = vec![None, None];
3997 let injections = vec![
3998 (0, AvroLiteral::Int(99)),
3999 (1, AvroLiteral::String("alice".into())),
4000 ];
4001 let mut rec = make_record_decoder_with_projector_defaults(
4002 &[
4003 ("id", DataType::Int32, false),
4004 ("name", DataType::Utf8, false),
4005 ],
4006 defaults,
4007 injections,
4008 0,
4009 );
4010 rec.decode(&mut AvroCursor::new(&[])).unwrap();
4011 let arr = rec.flush(None).unwrap();
4012 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4013 let id = s
4014 .column_by_name("id")
4015 .unwrap()
4016 .as_any()
4017 .downcast_ref::<Int32Array>()
4018 .unwrap();
4019 let name = s
4020 .column_by_name("name")
4021 .unwrap()
4022 .as_any()
4023 .downcast_ref::<StringArray>()
4024 .unwrap();
4025 assert_eq!(id.value(0), 99);
4026 assert_eq!(name.value(0), "alice");
4027 }
4028
4029 #[test]
4030 fn union_type_ids_are_not_child_indexes() {
4031 let encodings: Vec<AvroDataType> =
4032 vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4033 let fields: UnionFields = [
4034 (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4035 (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4036 ]
4037 .into_iter()
4038 .collect();
4039 let dt = avro_from_codec(Codec::Union(
4040 encodings.into(),
4041 fields.clone(),
4042 UnionMode::Dense,
4043 ));
4044 let mut dec = Decoder::try_new(&dt).expect("decoder");
4045 let mut b1 = encode_avro_long(1);
4046 b1.extend(encode_avro_bytes("hi".as_bytes()));
4047 dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4048 let mut b0 = encode_avro_long(0);
4049 b0.extend(encode_avro_int(5));
4050 dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4051 let arr = dec.flush(None).expect("flush");
4052 let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4053 assert_eq!(ua.len(), 2);
4054 assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4055 assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4056 assert_eq!(ua.value_offset(0), 0);
4057 assert_eq!(ua.value_offset(1), 0);
4058 let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4059 assert_eq!(utf8_child.len(), 1);
4060 assert_eq!(utf8_child.value(0), "hi");
4061 let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
4062 assert_eq!(int_child.len(), 1);
4063 assert_eq!(int_child.value(0), 5);
4064 let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
4065 assert_eq!(type_ids, vec![42_i8, 7_i8]);
4066 }
4067}