1use crate::codec::{
21 AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
22 ResolvedUnion,
23};
24use crate::reader::cursor::AvroCursor;
25use crate::schema::Nullability;
26#[cfg(feature = "small_decimals")]
27use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
28use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
29use arrow_array::types::*;
30use arrow_array::*;
31use arrow_buffer::*;
32use arrow_schema::{
33 ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField,
34 FieldRef, Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
35};
36#[cfg(feature = "small_decimals")]
37use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
38#[cfg(feature = "avro_custom_types")]
39use arrow_select::take::{TakeOptions, take};
40use std::cmp::Ordering;
41use std::sync::Arc;
42use strum_macros::AsRefStr;
43use uuid::Uuid;
44
45const DEFAULT_CAPACITY: usize = 1024;
46
47#[derive(Clone, Copy, Debug)]
49enum NullablePlan {
50 ReadTag,
52 FromSingle { promotion: Promotion },
55}
56
57macro_rules! decode_decimal {
59 ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
60 let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
61 $builder.append_value(<$Int>::from_be_bytes(bytes));
62 }};
63}
64
65macro_rules! flush_decimal {
67 ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
68 let (_, vals, _) = $builder.finish().into_parts();
69 let dec = <$ArrayTy>::try_new(vals, $nulls)?
70 .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)
71 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
72 Arc::new(dec) as ArrayRef
73 }};
74}
75
76macro_rules! append_decimal_default {
79 ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
80 match $lit {
81 AvroLiteral::Bytes(b) => {
82 let ext = sign_cast_to::<$N>(b)?;
83 let val = <$Int>::from_be_bytes(ext);
84 $builder.append_value(val);
85 Ok(())
86 }
87 _ => Err(ArrowError::InvalidArgumentError(
88 concat!(
89 "Default for ",
90 $name,
91 " must be bytes (two's-complement big-endian)"
92 )
93 .to_string(),
94 )),
95 }
96 }};
97}
98
99#[derive(Debug)]
101pub(crate) struct RecordDecoder {
102 schema: SchemaRef,
103 fields: Vec<Decoder>,
104 projector: Option<Projector>,
105}
106
107impl RecordDecoder {
108 pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, ArrowError> {
119 match data_type.codec() {
120 Codec::Struct(reader_fields) => {
121 let mut arrow_fields = Vec::with_capacity(reader_fields.len());
123 let mut encodings = Vec::with_capacity(reader_fields.len());
124 for avro_field in reader_fields.iter() {
125 arrow_fields.push(avro_field.field());
126 encodings.push(Decoder::try_new(avro_field.data_type())?);
127 }
128 let projector = match data_type.resolution.as_ref() {
129 Some(ResolutionInfo::Record(rec)) => {
130 Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
131 }
132 _ => None,
133 };
134 Ok(Self {
135 schema: Arc::new(ArrowSchema::new(arrow_fields)),
136 fields: encodings,
137 projector,
138 })
139 }
140 other => Err(ArrowError::ParseError(format!(
141 "Expected record got {other:?}"
142 ))),
143 }
144 }
145
146 pub(crate) fn schema(&self) -> &SchemaRef {
148 &self.schema
149 }
150
151 pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> {
153 let mut cursor = AvroCursor::new(buf);
154 match self.projector.as_mut() {
155 Some(proj) => {
156 for _ in 0..count {
157 proj.project_record(&mut cursor, &mut self.fields)?;
158 }
159 }
160 None => {
161 for _ in 0..count {
162 for field in &mut self.fields {
163 field.decode(&mut cursor)?;
164 }
165 }
166 }
167 }
168 Ok(cursor.position())
169 }
170
171 pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
173 let arrays = self
174 .fields
175 .iter_mut()
176 .map(|x| x.flush(None))
177 .collect::<Result<Vec<_>, _>>()?;
178 RecordBatch::try_new(self.schema.clone(), arrays)
179 }
180}
181
182#[derive(Debug)]
183struct EnumResolution {
184 mapping: Arc<[i32]>,
185 default_index: i32,
186}
187
188#[derive(Debug, AsRefStr)]
189enum Decoder {
190 Null(usize),
191 Boolean(BooleanBufferBuilder),
192 Int32(Vec<i32>),
193 Int64(Vec<i64>),
194 #[cfg(feature = "avro_custom_types")]
195 DurationSecond(Vec<i64>),
196 #[cfg(feature = "avro_custom_types")]
197 DurationMillisecond(Vec<i64>),
198 #[cfg(feature = "avro_custom_types")]
199 DurationMicrosecond(Vec<i64>),
200 #[cfg(feature = "avro_custom_types")]
201 DurationNanosecond(Vec<i64>),
202 Float32(Vec<f32>),
203 Float64(Vec<f64>),
204 Date32(Vec<i32>),
205 TimeMillis(Vec<i32>),
206 TimeMicros(Vec<i64>),
207 TimestampMillis(bool, Vec<i64>),
208 TimestampMicros(bool, Vec<i64>),
209 TimestampNanos(bool, Vec<i64>),
210 Int32ToInt64(Vec<i64>),
211 Int32ToFloat32(Vec<f32>),
212 Int32ToFloat64(Vec<f64>),
213 Int64ToFloat32(Vec<f32>),
214 Int64ToFloat64(Vec<f64>),
215 Float32ToFloat64(Vec<f64>),
216 BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
217 StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
218 Binary(OffsetBufferBuilder<i32>, Vec<u8>),
219 String(OffsetBufferBuilder<i32>, Vec<u8>),
221 StringView(OffsetBufferBuilder<i32>, Vec<u8>),
223 Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
224 Record(Fields, Vec<Decoder>, Option<Projector>),
225 Map(
226 FieldRef,
227 OffsetBufferBuilder<i32>,
228 OffsetBufferBuilder<i32>,
229 Vec<u8>,
230 Box<Decoder>,
231 ),
232 Fixed(i32, Vec<u8>),
233 Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
234 Duration(IntervalMonthDayNanoBuilder),
235 Uuid(Vec<u8>),
236 #[cfg(feature = "small_decimals")]
237 Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
238 #[cfg(feature = "small_decimals")]
239 Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
240 Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
241 Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
242 #[cfg(feature = "avro_custom_types")]
243 RunEndEncoded(u8, usize, Box<Decoder>),
244 Union(UnionDecoder),
245 Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
246}
247
248impl Decoder {
249 fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
250 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
251 if info.writer_is_union && !info.reader_is_union {
252 let mut clone = data_type.clone();
253 clone.resolution = None; let target = Box::new(Self::try_new_internal(&clone)?);
255 let decoder = Self::Union(
256 UnionDecoderBuilder::new()
257 .with_resolved_union(info.clone())
258 .with_target(target)
259 .build()?,
260 );
261 return Ok(decoder);
262 }
263 }
264 Self::try_new_internal(data_type)
265 }
266
267 fn try_new_internal(data_type: &AvroDataType) -> Result<Self, ArrowError> {
268 let promotion = match data_type.resolution.as_ref() {
270 Some(ResolutionInfo::Promotion(p)) => Some(p),
271 _ => None,
272 };
273 let decoder = match (data_type.codec(), promotion) {
274 (Codec::Int64, Some(Promotion::IntToLong)) => {
275 Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
276 }
277 (Codec::Float32, Some(Promotion::IntToFloat)) => {
278 Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
279 }
280 (Codec::Float64, Some(Promotion::IntToDouble)) => {
281 Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
282 }
283 (Codec::Float32, Some(Promotion::LongToFloat)) => {
284 Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
285 }
286 (Codec::Float64, Some(Promotion::LongToDouble)) => {
287 Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
288 }
289 (Codec::Float64, Some(Promotion::FloatToDouble)) => {
290 Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
291 }
292 (Codec::Utf8, Some(Promotion::BytesToString))
293 | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
294 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
295 Vec::with_capacity(DEFAULT_CAPACITY),
296 ),
297 (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
298 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
299 Vec::with_capacity(DEFAULT_CAPACITY),
300 ),
301 (Codec::Null, _) => Self::Null(0),
302 (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
303 (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
304 (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
305 (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
306 (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
307 (Codec::Binary, _) => Self::Binary(
308 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
309 Vec::with_capacity(DEFAULT_CAPACITY),
310 ),
311 (Codec::Utf8, _) => Self::String(
312 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
313 Vec::with_capacity(DEFAULT_CAPACITY),
314 ),
315 (Codec::Utf8View, _) => Self::StringView(
316 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
317 Vec::with_capacity(DEFAULT_CAPACITY),
318 ),
319 (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
320 (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
321 (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
322 (Codec::TimestampMillis(is_utc), _) => {
323 Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
324 }
325 (Codec::TimestampMicros(is_utc), _) => {
326 Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
327 }
328 (Codec::TimestampNanos(is_utc), _) => {
329 Self::TimestampNanos(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
330 }
331 #[cfg(feature = "avro_custom_types")]
332 (Codec::DurationNanos, _) => {
333 Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
334 }
335 #[cfg(feature = "avro_custom_types")]
336 (Codec::DurationMicros, _) => {
337 Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
338 }
339 #[cfg(feature = "avro_custom_types")]
340 (Codec::DurationMillis, _) => {
341 Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
342 }
343 #[cfg(feature = "avro_custom_types")]
344 (Codec::DurationSeconds, _) => {
345 Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
346 }
347 (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
348 (Codec::Decimal(precision, scale, size), _) => {
349 let p = *precision;
350 let s = *scale;
351 let prec = p as u8;
352 let scl = s.unwrap_or(0) as i8;
353 #[cfg(feature = "small_decimals")]
354 {
355 if p <= DECIMAL32_MAX_PRECISION as usize {
356 let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
357 .with_precision_and_scale(prec, scl)?;
358 Self::Decimal32(p, s, *size, builder)
359 } else if p <= DECIMAL64_MAX_PRECISION as usize {
360 let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
361 .with_precision_and_scale(prec, scl)?;
362 Self::Decimal64(p, s, *size, builder)
363 } else if p <= DECIMAL128_MAX_PRECISION as usize {
364 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
365 .with_precision_and_scale(prec, scl)?;
366 Self::Decimal128(p, s, *size, builder)
367 } else if p <= DECIMAL256_MAX_PRECISION as usize {
368 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
369 .with_precision_and_scale(prec, scl)?;
370 Self::Decimal256(p, s, *size, builder)
371 } else {
372 return Err(ArrowError::ParseError(format!(
373 "Decimal precision {p} exceeds maximum supported"
374 )));
375 }
376 }
377 #[cfg(not(feature = "small_decimals"))]
378 {
379 if p <= DECIMAL128_MAX_PRECISION as usize {
380 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
381 .with_precision_and_scale(prec, scl)?;
382 Self::Decimal128(p, s, *size, builder)
383 } else if p <= DECIMAL256_MAX_PRECISION as usize {
384 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
385 .with_precision_and_scale(prec, scl)?;
386 Self::Decimal256(p, s, *size, builder)
387 } else {
388 return Err(ArrowError::ParseError(format!(
389 "Decimal precision {p} exceeds maximum supported"
390 )));
391 }
392 }
393 }
394 (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
395 (Codec::List(item), _) => {
396 let decoder = Self::try_new(item)?;
397 Self::Array(
398 Arc::new(item.field_with_name("item")),
399 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
400 Box::new(decoder),
401 )
402 }
403 (Codec::Enum(symbols), _) => {
404 let res = match data_type.resolution.as_ref() {
405 Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
406 mapping: mapping.mapping.clone(),
407 default_index: mapping.default_index,
408 }),
409 _ => None,
410 };
411 Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
412 }
413 (Codec::Struct(fields), _) => {
414 let mut arrow_fields = Vec::with_capacity(fields.len());
415 let mut encodings = Vec::with_capacity(fields.len());
416 for avro_field in fields.iter() {
417 let encoding = Self::try_new(avro_field.data_type())?;
418 arrow_fields.push(avro_field.field());
419 encodings.push(encoding);
420 }
421 let projector =
422 if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
423 Some(ProjectorBuilder::try_new(rec, fields).build()?)
424 } else {
425 None
426 };
427 Self::Record(arrow_fields.into(), encodings, projector)
428 }
429 (Codec::Map(child), _) => {
430 let val_field = child.field_with_name("value");
431 let map_field = Arc::new(ArrowField::new(
432 "entries",
433 DataType::Struct(Fields::from(vec![
434 ArrowField::new("key", DataType::Utf8, false),
435 val_field,
436 ])),
437 false,
438 ));
439 let val_dec = Self::try_new(child)?;
440 Self::Map(
441 map_field,
442 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
443 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
444 Vec::with_capacity(DEFAULT_CAPACITY),
445 Box::new(val_dec),
446 )
447 }
448 (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
449 (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
450 let decoders = encodings
451 .iter()
452 .map(Self::try_new_internal)
453 .collect::<Result<Vec<_>, _>>()?;
454 if fields.len() != decoders.len() {
455 return Err(ArrowError::SchemaError(format!(
456 "Union has {} fields but {} decoders",
457 fields.len(),
458 decoders.len()
459 )));
460 }
461 let branch_count = decoders.len();
464 let max_addr = (i32::MAX as usize) + 1;
465 if branch_count > max_addr {
466 return Err(ArrowError::SchemaError(format!(
467 "Union has {branch_count} branches, which exceeds the maximum addressable \
468 branches by an Avro int tag ({} + 1).",
469 i32::MAX
470 )));
471 }
472 let mut builder = UnionDecoderBuilder::new()
473 .with_fields(fields.clone())
474 .with_branches(decoders);
475 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
476 if info.reader_is_union {
477 builder = builder.with_resolved_union(info.clone());
478 }
479 }
480 Self::Union(builder.build()?)
481 }
482 (Codec::Union(_, _, _), _) => {
483 return Err(ArrowError::NotYetImplemented(
484 "Sparse Arrow unions are not yet supported".to_string(),
485 ));
486 }
487 #[cfg(feature = "avro_custom_types")]
488 (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
489 let inner = Self::try_new(values_dt)?;
490 let byte_width: u8 = match *width_bits_or_bytes {
491 2 | 4 | 8 => *width_bits_or_bytes,
492 16 => 2,
493 32 => 4,
494 64 => 8,
495 other => {
496 return Err(ArrowError::InvalidArgumentError(format!(
497 "Unsupported run-end width {other} for RunEndEncoded; \
498 expected 16/32/64 bits or 2/4/8 bytes"
499 )));
500 }
501 };
502 Self::RunEndEncoded(byte_width, 0, Box::new(inner))
503 }
504 };
505 Ok(match data_type.nullability() {
506 Some(nullability) => {
507 let mut plan = NullablePlan::ReadTag;
509 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
510 if !info.writer_is_union && info.reader_is_union {
511 if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
512 plan = NullablePlan::FromSingle { promotion: *promo };
513 }
514 }
515 }
516 Self::Nullable(
517 nullability,
518 NullBufferBuilder::new(DEFAULT_CAPACITY),
519 Box::new(decoder),
520 plan,
521 )
522 }
523 None => decoder,
524 })
525 }
526
527 fn append_null(&mut self) -> Result<(), ArrowError> {
529 match self {
530 Self::Null(count) => *count += 1,
531 Self::Boolean(b) => b.append(false),
532 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
533 Self::Int64(v)
534 | Self::Int32ToInt64(v)
535 | Self::TimeMicros(v)
536 | Self::TimestampMillis(_, v)
537 | Self::TimestampMicros(_, v)
538 | Self::TimestampNanos(_, v) => v.push(0),
539 #[cfg(feature = "avro_custom_types")]
540 Self::DurationSecond(v)
541 | Self::DurationMillisecond(v)
542 | Self::DurationMicrosecond(v)
543 | Self::DurationNanosecond(v) => v.push(0),
544 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
545 Self::Float64(v)
546 | Self::Int32ToFloat64(v)
547 | Self::Int64ToFloat64(v)
548 | Self::Float32ToFloat64(v) => v.push(0.),
549 Self::Binary(offsets, _)
550 | Self::String(offsets, _)
551 | Self::StringView(offsets, _)
552 | Self::BytesToString(offsets, _)
553 | Self::StringToBytes(offsets, _) => {
554 offsets.push_length(0);
555 }
556 Self::Uuid(v) => {
557 v.extend([0; 16]);
558 }
559 Self::Array(_, offsets, _) => {
560 offsets.push_length(0);
561 }
562 Self::Record(_, e, _) => {
563 for encoding in e.iter_mut() {
564 encoding.append_null()?;
565 }
566 }
567 Self::Map(_, _koff, moff, _, _) => {
568 moff.push_length(0);
569 }
570 Self::Fixed(sz, accum) => {
571 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
572 }
573 #[cfg(feature = "small_decimals")]
574 Self::Decimal32(_, _, _, builder) => builder.append_value(0),
575 #[cfg(feature = "small_decimals")]
576 Self::Decimal64(_, _, _, builder) => builder.append_value(0),
577 Self::Decimal128(_, _, _, builder) => builder.append_value(0),
578 Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
579 Self::Enum(indices, _, _) => indices.push(0),
580 Self::Duration(builder) => builder.append_null(),
581 #[cfg(feature = "avro_custom_types")]
582 Self::RunEndEncoded(_, len, inner) => {
583 *len += 1;
584 inner.append_null()?;
585 }
586 Self::Union(u) => u.append_null()?,
587 Self::Nullable(_, null_buffer, inner, _) => {
588 null_buffer.append(false);
589 inner.append_null()?;
590 }
591 }
592 Ok(())
593 }
594
595 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
597 match self {
598 Self::Nullable(_, nb, inner, _) => {
599 if matches!(lit, AvroLiteral::Null) {
600 nb.append(false);
601 inner.append_null()
602 } else {
603 nb.append(true);
604 inner.append_default(lit)
605 }
606 }
607 Self::Null(count) => match lit {
608 AvroLiteral::Null => {
609 *count += 1;
610 Ok(())
611 }
612 _ => Err(ArrowError::InvalidArgumentError(
613 "Non-null default for null type".to_string(),
614 )),
615 },
616 Self::Boolean(b) => match lit {
617 AvroLiteral::Boolean(v) => {
618 b.append(*v);
619 Ok(())
620 }
621 _ => Err(ArrowError::InvalidArgumentError(
622 "Default for boolean must be boolean".to_string(),
623 )),
624 },
625 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
626 AvroLiteral::Int(i) => {
627 v.push(*i);
628 Ok(())
629 }
630 _ => Err(ArrowError::InvalidArgumentError(
631 "Default for int32/date32/time-millis must be int".to_string(),
632 )),
633 },
634 #[cfg(feature = "avro_custom_types")]
635 Self::DurationSecond(v)
636 | Self::DurationMillisecond(v)
637 | Self::DurationMicrosecond(v)
638 | Self::DurationNanosecond(v) => match lit {
639 AvroLiteral::Long(i) => {
640 v.push(*i);
641 Ok(())
642 }
643 _ => Err(ArrowError::InvalidArgumentError(
644 "Default for duration long must be long".to_string(),
645 )),
646 },
647 Self::Int64(v)
648 | Self::Int32ToInt64(v)
649 | Self::TimeMicros(v)
650 | Self::TimestampMillis(_, v)
651 | Self::TimestampMicros(_, v)
652 | Self::TimestampNanos(_, v) => match lit {
653 AvroLiteral::Long(i) => {
654 v.push(*i);
655 Ok(())
656 }
657 AvroLiteral::Int(i) => {
658 v.push(*i as i64);
659 Ok(())
660 }
661 _ => Err(ArrowError::InvalidArgumentError(
662 "Default for long/time-micros/timestamp must be long or int".to_string(),
663 )),
664 },
665 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
666 AvroLiteral::Float(f) => {
667 v.push(*f);
668 Ok(())
669 }
670 _ => Err(ArrowError::InvalidArgumentError(
671 "Default for float must be float".to_string(),
672 )),
673 },
674 Self::Float64(v)
675 | Self::Int32ToFloat64(v)
676 | Self::Int64ToFloat64(v)
677 | Self::Float32ToFloat64(v) => match lit {
678 AvroLiteral::Double(f) => {
679 v.push(*f);
680 Ok(())
681 }
682 _ => Err(ArrowError::InvalidArgumentError(
683 "Default for double must be double".to_string(),
684 )),
685 },
686 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
687 AvroLiteral::Bytes(b) => {
688 offsets.push_length(b.len());
689 values.extend_from_slice(b);
690 Ok(())
691 }
692 _ => Err(ArrowError::InvalidArgumentError(
693 "Default for bytes must be bytes".to_string(),
694 )),
695 },
696 Self::BytesToString(offsets, values)
697 | Self::String(offsets, values)
698 | Self::StringView(offsets, values) => match lit {
699 AvroLiteral::String(s) => {
700 let b = s.as_bytes();
701 offsets.push_length(b.len());
702 values.extend_from_slice(b);
703 Ok(())
704 }
705 _ => Err(ArrowError::InvalidArgumentError(
706 "Default for string must be string".to_string(),
707 )),
708 },
709 Self::Uuid(values) => match lit {
710 AvroLiteral::String(s) => {
711 let uuid = Uuid::try_parse(s).map_err(|e| {
712 ArrowError::InvalidArgumentError(format!("Invalid UUID default: {s} ({e})"))
713 })?;
714 values.extend_from_slice(uuid.as_bytes());
715 Ok(())
716 }
717 _ => Err(ArrowError::InvalidArgumentError(
718 "Default for uuid must be string".to_string(),
719 )),
720 },
721 Self::Fixed(sz, accum) => match lit {
722 AvroLiteral::Bytes(b) => {
723 if b.len() != *sz as usize {
724 return Err(ArrowError::InvalidArgumentError(format!(
725 "Fixed default length {} does not match size {sz}",
726 b.len(),
727 )));
728 }
729 accum.extend_from_slice(b);
730 Ok(())
731 }
732 _ => Err(ArrowError::InvalidArgumentError(
733 "Default for fixed must be bytes".to_string(),
734 )),
735 },
736 #[cfg(feature = "small_decimals")]
737 Self::Decimal32(_, _, _, builder) => {
738 append_decimal_default!(lit, builder, 4, i32, "decimal32")
739 }
740 #[cfg(feature = "small_decimals")]
741 Self::Decimal64(_, _, _, builder) => {
742 append_decimal_default!(lit, builder, 8, i64, "decimal64")
743 }
744 Self::Decimal128(_, _, _, builder) => {
745 append_decimal_default!(lit, builder, 16, i128, "decimal128")
746 }
747 Self::Decimal256(_, _, _, builder) => {
748 append_decimal_default!(lit, builder, 32, i256, "decimal256")
749 }
750 Self::Duration(builder) => match lit {
751 AvroLiteral::Bytes(b) => {
752 if b.len() != 12 {
753 return Err(ArrowError::InvalidArgumentError(format!(
754 "Duration default must be exactly 12 bytes, got {}",
755 b.len()
756 )));
757 }
758 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
759 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
760 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
761 let nanos = (millis as i64) * 1_000_000;
762 builder.append_value(IntervalMonthDayNano::new(
763 months as i32,
764 days as i32,
765 nanos,
766 ));
767 Ok(())
768 }
769 _ => Err(ArrowError::InvalidArgumentError(
770 "Default for duration must be 12-byte little-endian months/days/millis"
771 .to_string(),
772 )),
773 },
774 Self::Array(_, offsets, inner) => match lit {
775 AvroLiteral::Array(items) => {
776 offsets.push_length(items.len());
777 for item in items {
778 inner.append_default(item)?;
779 }
780 Ok(())
781 }
782 _ => Err(ArrowError::InvalidArgumentError(
783 "Default for array must be an array literal".to_string(),
784 )),
785 },
786 Self::Map(_, koff, moff, kdata, valdec) => match lit {
787 AvroLiteral::Map(entries) => {
788 moff.push_length(entries.len());
789 for (k, v) in entries {
790 let kb = k.as_bytes();
791 koff.push_length(kb.len());
792 kdata.extend_from_slice(kb);
793 valdec.append_default(v)?;
794 }
795 Ok(())
796 }
797 _ => Err(ArrowError::InvalidArgumentError(
798 "Default for map must be a map/object literal".to_string(),
799 )),
800 },
801 Self::Enum(indices, symbols, _) => match lit {
802 AvroLiteral::Enum(sym) => {
803 let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
804 ArrowError::InvalidArgumentError(format!(
805 "Enum default symbol {sym:?} not in reader symbols"
806 ))
807 })?;
808 indices.push(pos as i32);
809 Ok(())
810 }
811 _ => Err(ArrowError::InvalidArgumentError(
812 "Default for enum must be a symbol".to_string(),
813 )),
814 },
815 #[cfg(feature = "avro_custom_types")]
816 Self::RunEndEncoded(_, len, inner) => {
817 *len += 1;
818 inner.append_default(lit)
819 }
820 Self::Union(u) => u.append_default(lit),
821 Self::Record(field_meta, decoders, projector) => match lit {
822 AvroLiteral::Map(entries) => {
823 for (i, dec) in decoders.iter_mut().enumerate() {
824 let name = field_meta[i].name();
825 if let Some(sub) = entries.get(name) {
826 dec.append_default(sub)?;
827 } else if let Some(proj) = projector.as_ref() {
828 proj.project_default(dec, i)?;
829 } else {
830 dec.append_null()?;
831 }
832 }
833 Ok(())
834 }
835 AvroLiteral::Null => {
836 for (i, dec) in decoders.iter_mut().enumerate() {
837 if let Some(proj) = projector.as_ref() {
838 proj.project_default(dec, i)?;
839 } else {
840 dec.append_null()?;
841 }
842 }
843 Ok(())
844 }
845 _ => Err(ArrowError::InvalidArgumentError(
846 "Default for record must be a map/object or null".to_string(),
847 )),
848 },
849 }
850 }
851
852 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
854 match self {
855 Self::Null(x) => *x += 1,
856 Self::Boolean(values) => values.append(buf.get_bool()?),
857 Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
858 values.push(buf.get_int()?)
859 }
860 Self::Int64(values)
861 | Self::TimeMicros(values)
862 | Self::TimestampMillis(_, values)
863 | Self::TimestampMicros(_, values)
864 | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
865 #[cfg(feature = "avro_custom_types")]
866 Self::DurationSecond(values)
867 | Self::DurationMillisecond(values)
868 | Self::DurationMicrosecond(values)
869 | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
870 Self::Float32(values) => values.push(buf.get_float()?),
871 Self::Float64(values) => values.push(buf.get_double()?),
872 Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
873 Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
874 Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
875 Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
876 Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
877 Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
878 Self::StringToBytes(offsets, values)
879 | Self::BytesToString(offsets, values)
880 | Self::Binary(offsets, values)
881 | Self::String(offsets, values)
882 | Self::StringView(offsets, values) => {
883 let data = buf.get_bytes()?;
884 offsets.push_length(data.len());
885 values.extend_from_slice(data);
886 }
887 Self::Uuid(values) => {
888 let s_bytes = buf.get_bytes()?;
889 let s = std::str::from_utf8(s_bytes).map_err(|e| {
890 ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
891 })?;
892 let uuid = Uuid::try_parse(s)
893 .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?;
894 values.extend_from_slice(uuid.as_bytes());
895 }
896 Self::Array(_, off, encoding) => {
897 let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
898 off.push_length(total_items);
899 }
900 Self::Record(_, encodings, None) => {
901 for encoding in encodings {
902 encoding.decode(buf)?;
903 }
904 }
905 Self::Record(_, encodings, Some(proj)) => {
906 proj.project_record(buf, encodings)?;
907 }
908 Self::Map(_, koff, moff, kdata, valdec) => {
909 let newly_added = read_blocks(buf, |cur| {
910 let kb = cur.get_bytes()?;
911 koff.push_length(kb.len());
912 kdata.extend_from_slice(kb);
913 valdec.decode(cur)
914 })?;
915 moff.push_length(newly_added);
916 }
917 Self::Fixed(sz, accum) => {
918 let fx = buf.get_fixed(*sz as usize)?;
919 accum.extend_from_slice(fx);
920 }
921 #[cfg(feature = "small_decimals")]
922 Self::Decimal32(_, _, size, builder) => {
923 decode_decimal!(size, buf, builder, 4, i32);
924 }
925 #[cfg(feature = "small_decimals")]
926 Self::Decimal64(_, _, size, builder) => {
927 decode_decimal!(size, buf, builder, 8, i64);
928 }
929 Self::Decimal128(_, _, size, builder) => {
930 decode_decimal!(size, buf, builder, 16, i128);
931 }
932 Self::Decimal256(_, _, size, builder) => {
933 decode_decimal!(size, buf, builder, 32, i256);
934 }
935 Self::Enum(indices, _, None) => {
936 indices.push(buf.get_int()?);
937 }
938 Self::Enum(indices, _, Some(res)) => {
939 let raw = buf.get_int()?;
940 let resolved = usize::try_from(raw)
941 .ok()
942 .and_then(|idx| res.mapping.get(idx).copied())
943 .filter(|&idx| idx >= 0)
944 .unwrap_or(res.default_index);
945 if resolved >= 0 {
946 indices.push(resolved);
947 } else {
948 return Err(ArrowError::ParseError(format!(
949 "Enum symbol index {raw} not resolvable and no default provided",
950 )));
951 }
952 }
953 Self::Duration(builder) => {
954 let b = buf.get_fixed(12)?;
955 let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
956 let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
957 let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
958 let nanos = (millis as i64) * 1_000_000;
959 builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
960 }
961 #[cfg(feature = "avro_custom_types")]
962 Self::RunEndEncoded(_, len, inner) => {
963 *len += 1;
964 inner.decode(buf)?;
965 }
966 Self::Union(u) => u.decode(buf)?,
967 Self::Nullable(order, nb, encoding, plan) => {
968 match *plan {
969 NullablePlan::FromSingle { promotion } => {
970 encoding.decode_with_promotion(buf, promotion)?;
971 nb.append(true);
972 }
973 NullablePlan::ReadTag => {
974 let branch = buf.read_vlq()?;
975 let is_not_null = match *order {
976 Nullability::NullFirst => branch != 0,
977 Nullability::NullSecond => branch == 0,
978 };
979 if is_not_null {
980 encoding.decode(buf)?;
982 } else {
983 encoding.append_null()?;
984 }
985 nb.append(is_not_null);
986 }
987 }
988 }
989 }
990 Ok(())
991 }
992
993 fn decode_with_promotion(
994 &mut self,
995 buf: &mut AvroCursor<'_>,
996 promotion: Promotion,
997 ) -> Result<(), ArrowError> {
998 #[cfg(feature = "avro_custom_types")]
999 if let Self::RunEndEncoded(_, len, inner) = self {
1000 *len += 1;
1001 return inner.decode_with_promotion(buf, promotion);
1002 }
1003
1004 macro_rules! promote_numeric_to {
1005 ($variant:ident, $getter:ident, $to:ty) => {{
1006 match self {
1007 Self::$variant(v) => {
1008 let x = buf.$getter()?;
1009 v.push(x as $to);
1010 Ok(())
1011 }
1012 other => Err(ArrowError::ParseError(format!(
1013 "Promotion {promotion} target mismatch: expected {}, got {}",
1014 stringify!($variant),
1015 <Self as ::std::convert::AsRef<str>>::as_ref(other)
1016 ))),
1017 }
1018 }};
1019 }
1020 match promotion {
1021 Promotion::Direct => self.decode(buf),
1022 Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1023 Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1024 Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1025 Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1026 Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1027 Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1028 Promotion::StringToBytes => match self {
1029 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1030 let data = buf.get_bytes()?;
1031 offsets.push_length(data.len());
1032 values.extend_from_slice(data);
1033 Ok(())
1034 }
1035 other => Err(ArrowError::ParseError(format!(
1036 "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1037 <Self as AsRef<str>>::as_ref(other)
1038 ))),
1039 },
1040 Promotion::BytesToString => match self {
1041 Self::String(offsets, values)
1042 | Self::StringView(offsets, values)
1043 | Self::BytesToString(offsets, values) => {
1044 let data = buf.get_bytes()?;
1045 offsets.push_length(data.len());
1046 values.extend_from_slice(data);
1047 Ok(())
1048 }
1049 other => Err(ArrowError::ParseError(format!(
1050 "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1051 <Self as AsRef<str>>::as_ref(other)
1052 ))),
1053 },
1054 }
1055 }
1056
1057 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1059 Ok(match self {
1060 Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1061 Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1062 Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1063 Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1064 Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1065 Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1066 Self::TimeMillis(values) => {
1067 Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1068 }
1069 Self::TimeMicros(values) => {
1070 Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1071 }
1072 Self::TimestampMillis(is_utc, values) => Arc::new(
1073 flush_primitive::<TimestampMillisecondType>(values, nulls)
1074 .with_timezone_opt(is_utc.then(|| "+00:00")),
1075 ),
1076 Self::TimestampMicros(is_utc, values) => Arc::new(
1077 flush_primitive::<TimestampMicrosecondType>(values, nulls)
1078 .with_timezone_opt(is_utc.then(|| "+00:00")),
1079 ),
1080 Self::TimestampNanos(is_utc, values) => Arc::new(
1081 flush_primitive::<TimestampNanosecondType>(values, nulls)
1082 .with_timezone_opt(is_utc.then(|| "+00:00")),
1083 ),
1084 #[cfg(feature = "avro_custom_types")]
1085 Self::DurationSecond(values) => {
1086 Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1087 }
1088 #[cfg(feature = "avro_custom_types")]
1089 Self::DurationMillisecond(values) => {
1090 Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1091 }
1092 #[cfg(feature = "avro_custom_types")]
1093 Self::DurationMicrosecond(values) => {
1094 Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1095 }
1096 #[cfg(feature = "avro_custom_types")]
1097 Self::DurationNanosecond(values) => {
1098 Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1099 }
1100 Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1101 Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1102 Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1103 Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1104 Arc::new(flush_primitive::<Float32Type>(values, nulls))
1105 }
1106 Self::Int32ToFloat64(values)
1107 | Self::Int64ToFloat64(values)
1108 | Self::Float32ToFloat64(values) => {
1109 Arc::new(flush_primitive::<Float64Type>(values, nulls))
1110 }
1111 Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1112 let offsets = flush_offsets(offsets);
1113 let values = flush_values(values).into();
1114 Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1115 }
1116 Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1117 let offsets = flush_offsets(offsets);
1118 let values = flush_values(values).into();
1119 Arc::new(StringArray::try_new(offsets, values, nulls)?)
1120 }
1121 Self::StringView(offsets, values) => {
1122 let offsets = flush_offsets(offsets);
1123 let values = flush_values(values);
1124 let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1125 let values: Vec<&str> = (0..array.len())
1126 .map(|i| {
1127 if array.is_valid(i) {
1128 array.value(i)
1129 } else {
1130 ""
1131 }
1132 })
1133 .collect();
1134 Arc::new(StringViewArray::from(values))
1135 }
1136 Self::Array(field, offsets, values) => {
1137 let values = values.flush(None)?;
1138 let offsets = flush_offsets(offsets);
1139 Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1140 }
1141 Self::Record(fields, encodings, _) => {
1142 let arrays = encodings
1143 .iter_mut()
1144 .map(|x| x.flush(None))
1145 .collect::<Result<Vec<_>, _>>()?;
1146 Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1147 }
1148 Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1149 let moff = flush_offsets(m_off);
1150 let koff = flush_offsets(k_off);
1151 let kd = flush_values(kdata).into();
1152 let val_arr = valdec.flush(None)?;
1153 let key_arr = StringArray::try_new(koff, kd, None)?;
1154 if key_arr.len() != val_arr.len() {
1155 return Err(ArrowError::InvalidArgumentError(format!(
1156 "Map keys length ({}) != map values length ({})",
1157 key_arr.len(),
1158 val_arr.len()
1159 )));
1160 }
1161 let final_len = moff.len() - 1;
1162 if let Some(n) = &nulls {
1163 if n.len() != final_len {
1164 return Err(ArrowError::InvalidArgumentError(format!(
1165 "Map array null buffer length {} != final map length {final_len}",
1166 n.len()
1167 )));
1168 }
1169 }
1170 let entries_fields = match map_field.data_type() {
1171 DataType::Struct(fields) => fields.clone(),
1172 other => {
1173 return Err(ArrowError::InvalidArgumentError(format!(
1174 "Map entries field must be a Struct, got {other:?}"
1175 )));
1176 }
1177 };
1178 let entries_struct =
1179 StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1180 let map_arr =
1181 MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1182 Arc::new(map_arr)
1183 }
1184 Self::Fixed(sz, accum) => {
1185 let b: Buffer = flush_values(accum).into();
1186 let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1187 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1188 Arc::new(arr)
1189 }
1190 Self::Uuid(values) => {
1191 let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1192 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1193 Arc::new(arr)
1194 }
1195 #[cfg(feature = "small_decimals")]
1196 Self::Decimal32(precision, scale, _, builder) => {
1197 flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1198 }
1199 #[cfg(feature = "small_decimals")]
1200 Self::Decimal64(precision, scale, _, builder) => {
1201 flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1202 }
1203 Self::Decimal128(precision, scale, _, builder) => {
1204 flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1205 }
1206 Self::Decimal256(precision, scale, _, builder) => {
1207 flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1208 }
1209 Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1210 Self::Duration(builder) => {
1211 let (_, vals, _) = builder.finish().into_parts();
1212 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1213 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1214 Arc::new(vals)
1215 }
1216 #[cfg(feature = "avro_custom_types")]
1217 Self::RunEndEncoded(width, len, inner) => {
1218 let values = inner.flush(nulls)?;
1219 let n = *len;
1220 let arr = values.as_ref();
1221 let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1222 if n > 0 {
1223 run_starts.push(0);
1224 for i in 1..n {
1225 if !values_equal_at(arr, i - 1, i) {
1226 run_starts.push(i);
1227 }
1228 }
1229 }
1230 if n > (u32::MAX as usize) {
1231 return Err(ArrowError::InvalidArgumentError(format!(
1232 "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1233 )));
1234 }
1235 let run_count = run_starts.len();
1236 let take_idx: PrimitiveArray<UInt32Type> =
1237 run_starts.iter().map(|&s| s as u32).collect();
1238 let per_run_values = if run_count == 0 {
1239 values.slice(0, 0)
1240 } else {
1241 take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1242 ArrowError::ParseError(format!("take() for REE values failed: {e}"))
1243 })?
1244 };
1245
1246 macro_rules! build_run_array {
1247 ($Native:ty, $ArrowTy:ty) => {{
1248 let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1249 for (idx, &_start) in run_starts.iter().enumerate() {
1250 let end = if idx + 1 < run_count {
1251 run_starts[idx + 1]
1252 } else {
1253 n
1254 };
1255 ends.push(end as $Native);
1256 }
1257 let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1258 let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1259 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1260 Arc::new(run_arr) as ArrayRef
1261 }};
1262 }
1263 match *width {
1264 2 => {
1265 if n > i16::MAX as usize {
1266 return Err(ArrowError::InvalidArgumentError(format!(
1267 "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1268 )));
1269 }
1270 build_run_array!(i16, Int16Type)
1271 }
1272 4 => build_run_array!(i32, Int32Type),
1273 8 => build_run_array!(i64, Int64Type),
1274 other => {
1275 return Err(ArrowError::InvalidArgumentError(format!(
1276 "Unsupported run-end width {other} for RunEndEncoded"
1277 )));
1278 }
1279 }
1280 }
1281 Self::Union(u) => u.flush(nulls)?,
1282 })
1283 }
1284}
1285
1286#[derive(Debug)]
1288struct DispatchLookupTable {
1289 to_reader: Box<[i8]>,
1305 promotion: Box<[Promotion]>,
1310}
1311
1312const NO_SOURCE: i8 = -1;
1315
1316impl DispatchLookupTable {
1317 fn from_writer_to_reader(
1318 promotion_map: &[Option<(usize, Promotion)>],
1319 ) -> Result<Self, ArrowError> {
1320 let mut to_reader = Vec::with_capacity(promotion_map.len());
1321 let mut promotion = Vec::with_capacity(promotion_map.len());
1322 for map in promotion_map {
1323 match *map {
1324 Some((idx, promo)) => {
1325 let idx_i8 = i8::try_from(idx).map_err(|_| {
1326 ArrowError::SchemaError(format!(
1327 "Reader branch index {idx} exceeds i8 range (max {})",
1328 i8::MAX
1329 ))
1330 })?;
1331 to_reader.push(idx_i8);
1332 promotion.push(promo);
1333 }
1334 None => {
1335 to_reader.push(NO_SOURCE);
1336 promotion.push(Promotion::Direct);
1337 }
1338 }
1339 }
1340 Ok(Self {
1341 to_reader: to_reader.into_boxed_slice(),
1342 promotion: promotion.into_boxed_slice(),
1343 })
1344 }
1345
1346 #[inline]
1348 fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1349 let reader_index = *self.to_reader.get(writer_index)?;
1350 (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1351 }
1352}
1353
1354#[derive(Debug)]
1355struct UnionDecoder {
1356 fields: UnionFields,
1357 type_ids: Vec<i8>,
1358 offsets: Vec<i32>,
1359 branches: Vec<Decoder>,
1360 counts: Vec<i32>,
1361 reader_type_codes: Vec<i8>,
1362 default_emit_idx: usize,
1363 null_emit_idx: usize,
1364 plan: UnionReadPlan,
1365}
1366
1367impl Default for UnionDecoder {
1368 fn default() -> Self {
1369 Self {
1370 fields: UnionFields::empty(),
1371 type_ids: Vec::new(),
1372 offsets: Vec::new(),
1373 branches: Vec::new(),
1374 counts: Vec::new(),
1375 reader_type_codes: Vec::new(),
1376 default_emit_idx: 0,
1377 null_emit_idx: 0,
1378 plan: UnionReadPlan::Passthrough,
1379 }
1380 }
1381}
1382
1383#[derive(Debug)]
1384enum UnionReadPlan {
1385 ReaderUnion {
1386 lookup_table: DispatchLookupTable,
1387 },
1388 FromSingle {
1389 reader_idx: usize,
1390 promotion: Promotion,
1391 },
1392 ToSingle {
1393 target: Box<Decoder>,
1394 lookup_table: DispatchLookupTable,
1395 },
1396 Passthrough,
1397}
1398
1399impl UnionDecoder {
1400 fn try_new(
1401 fields: UnionFields,
1402 branches: Vec<Decoder>,
1403 resolved: Option<ResolvedUnion>,
1404 ) -> Result<Self, ArrowError> {
1405 let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1406 let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1407 let default_emit_idx = 0;
1408 let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1409 let branch_len = branches.len().max(reader_type_codes.len());
1410 let max_addr = (i32::MAX as usize) + 1;
1412 if branches.len() > max_addr {
1413 return Err(ArrowError::SchemaError(format!(
1414 "Reader union has {} branches, which exceeds the maximum addressable \
1415 branches by an Avro int tag ({} + 1).",
1416 branches.len(),
1417 i32::MAX
1418 )));
1419 }
1420 Ok(Self {
1421 fields,
1422 type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1423 offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1424 branches,
1425 counts: vec![0; branch_len],
1426 reader_type_codes,
1427 default_emit_idx,
1428 null_emit_idx,
1429 plan: Self::plan_from_resolved(resolved)?,
1430 })
1431 }
1432
1433 fn try_new_from_writer_union(
1434 info: ResolvedUnion,
1435 target: Box<Decoder>,
1436 ) -> Result<Self, ArrowError> {
1437 debug_assert!(info.writer_is_union && !info.reader_is_union);
1439 let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1440 Ok(Self {
1441 plan: UnionReadPlan::ToSingle {
1442 target,
1443 lookup_table,
1444 },
1445 ..Self::default()
1446 })
1447 }
1448
1449 fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, ArrowError> {
1450 let Some(info) = resolved else {
1451 return Ok(UnionReadPlan::Passthrough);
1452 };
1453 match (info.writer_is_union, info.reader_is_union) {
1454 (true, true) => {
1455 let lookup_table =
1456 DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1457 Ok(UnionReadPlan::ReaderUnion { lookup_table })
1458 }
1459 (false, true) => {
1460 let Some(&(reader_idx, promotion)) =
1461 info.writer_to_reader.first().and_then(Option::as_ref)
1462 else {
1463 return Err(ArrowError::SchemaError(
1464 "Writer type does not match any reader union branch".to_string(),
1465 ));
1466 };
1467 Ok(UnionReadPlan::FromSingle {
1468 reader_idx,
1469 promotion,
1470 })
1471 }
1472 (true, false) => Err(ArrowError::InvalidArgumentError(
1473 "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1474 .to_string(),
1475 )),
1476 _ => Err(ArrowError::SchemaError(
1478 "ResolvedUnion constructed for non-union sides; resolver should return None"
1479 .to_string(),
1480 )),
1481 }
1482 }
1483
1484 #[inline]
1485 fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
1486 let raw = buf.get_long()?;
1491 if raw < 0 {
1492 return Err(ArrowError::ParseError(format!(
1493 "Negative union branch index {raw}"
1494 )));
1495 }
1496 usize::try_from(raw).map_err(|_| {
1497 ArrowError::ParseError(format!(
1498 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1499 (usize::BITS as usize)
1500 ))
1501 })
1502 }
1503
1504 #[inline]
1505 fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, ArrowError> {
1506 let branches_len = self.branches.len();
1507 let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1508 return Err(ArrowError::ParseError(format!(
1509 "Union branch index {reader_idx} out of range ({branches_len} branches)"
1510 )));
1511 };
1512 self.type_ids.push(self.reader_type_codes[reader_idx]);
1513 self.offsets.push(self.counts[reader_idx]);
1514 self.counts[reader_idx] += 1;
1515 Ok(reader_branch)
1516 }
1517
1518 #[inline]
1519 fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), ArrowError>
1520 where
1521 F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
1522 {
1523 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1524 return action(target);
1525 }
1526 let reader_idx = match &self.plan {
1527 UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1528 _ => fallback_idx,
1529 };
1530 self.emit_to(reader_idx).and_then(action)
1531 }
1532
1533 fn append_null(&mut self) -> Result<(), ArrowError> {
1534 self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1535 }
1536
1537 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
1538 self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1539 }
1540
1541 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1542 let (reader_idx, promotion) = match &mut self.plan {
1543 UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1544 UnionReadPlan::ReaderUnion { lookup_table } => {
1545 let idx = Self::read_tag(buf)?;
1546 lookup_table.resolve(idx).ok_or_else(|| {
1547 ArrowError::ParseError(format!(
1548 "Union branch index {idx} not resolvable by reader schema"
1549 ))
1550 })?
1551 }
1552 UnionReadPlan::FromSingle {
1553 reader_idx,
1554 promotion,
1555 } => (*reader_idx, *promotion),
1556 UnionReadPlan::ToSingle {
1557 target,
1558 lookup_table,
1559 } => {
1560 let idx = Self::read_tag(buf)?;
1561 return match lookup_table.resolve(idx) {
1562 Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1563 None => Err(ArrowError::ParseError(format!(
1564 "Writer union branch {idx} does not resolve to reader type"
1565 ))),
1566 };
1567 }
1568 };
1569 let decoder = self.emit_to(reader_idx)?;
1570 decoder.decode_with_promotion(buf, promotion)
1571 }
1572
1573 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1574 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1575 return target.flush(nulls);
1576 }
1577 debug_assert!(
1578 nulls.is_none(),
1579 "UnionArray does not accept a validity bitmap; \
1580 nulls should have been materialized as a Null child during decode"
1581 );
1582 let children = self
1583 .branches
1584 .iter_mut()
1585 .map(|d| d.flush(None))
1586 .collect::<Result<Vec<_>, _>>()?;
1587 let arr = UnionArray::try_new(
1588 self.fields.clone(),
1589 flush_values(&mut self.type_ids).into_iter().collect(),
1590 Some(flush_values(&mut self.offsets).into_iter().collect()),
1591 children,
1592 )
1593 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1594 Ok(Arc::new(arr))
1595 }
1596}
1597
1598#[derive(Debug, Default)]
1599struct UnionDecoderBuilder {
1600 fields: Option<UnionFields>,
1601 branches: Option<Vec<Decoder>>,
1602 resolved: Option<ResolvedUnion>,
1603 target: Option<Box<Decoder>>,
1604}
1605
1606impl UnionDecoderBuilder {
1607 fn new() -> Self {
1608 Self::default()
1609 }
1610
1611 fn with_fields(mut self, fields: UnionFields) -> Self {
1612 self.fields = Some(fields);
1613 self
1614 }
1615
1616 fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
1617 self.branches = Some(branches);
1618 self
1619 }
1620
1621 fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
1622 self.resolved = Some(resolved_union);
1623 self
1624 }
1625
1626 fn with_target(mut self, target: Box<Decoder>) -> Self {
1627 self.target = Some(target);
1628 self
1629 }
1630
1631 fn build(self) -> Result<UnionDecoder, ArrowError> {
1632 match (self.resolved, self.fields, self.branches, self.target) {
1633 (resolved, Some(fields), Some(branches), None) => {
1634 UnionDecoder::try_new(fields, branches, resolved)
1635 }
1636 (Some(info), None, None, Some(target))
1637 if info.writer_is_union && !info.reader_is_union =>
1638 {
1639 UnionDecoder::try_new_from_writer_union(info, target)
1640 }
1641 _ => Err(ArrowError::InvalidArgumentError(
1642 "Invalid UnionDecoderBuilder configuration: expected either \
1643 (fields + branches + resolved) with no target for reader-unions, or \
1644 (resolved + target) with no fields/branches for writer-union to single."
1645 .to_string(),
1646 )),
1647 }
1648 }
1649}
1650
1651#[derive(Debug, Copy, Clone)]
1652enum NegativeBlockBehavior {
1653 ProcessItems,
1654 SkipBySize,
1655}
1656
1657#[inline]
1658fn skip_blocks(
1659 buf: &mut AvroCursor,
1660 mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1661) -> Result<usize, ArrowError> {
1662 process_blockwise(
1663 buf,
1664 move |c| skip_item(c),
1665 NegativeBlockBehavior::SkipBySize,
1666 )
1667}
1668
1669#[inline]
1670fn flush_dict(
1671 indices: &mut Vec<i32>,
1672 symbols: &[String],
1673 nulls: Option<NullBuffer>,
1674) -> Result<ArrayRef, ArrowError> {
1675 let keys = flush_primitive::<Int32Type>(indices, nulls);
1676 let values = Arc::new(StringArray::from_iter_values(
1677 symbols.iter().map(|s| s.as_str()),
1678 ));
1679 DictionaryArray::try_new(keys, values)
1680 .map_err(|e| ArrowError::ParseError(e.to_string()))
1681 .map(|arr| Arc::new(arr) as ArrayRef)
1682}
1683
1684#[inline]
1685fn read_blocks(
1686 buf: &mut AvroCursor,
1687 decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1688) -> Result<usize, ArrowError> {
1689 process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
1690}
1691
1692#[inline]
1693fn process_blockwise(
1694 buf: &mut AvroCursor,
1695 mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1696 negative_behavior: NegativeBlockBehavior,
1697) -> Result<usize, ArrowError> {
1698 let mut total = 0usize;
1699 loop {
1700 let block_count = buf.get_long()?;
1705 match block_count.cmp(&0) {
1706 Ordering::Equal => break,
1707 Ordering::Less => {
1708 let count = (-block_count) as usize;
1709 let size_in_bytes = buf.get_long()? as usize;
1711 match negative_behavior {
1712 NegativeBlockBehavior::ProcessItems => {
1713 for _ in 0..count {
1715 on_item(buf)?;
1716 }
1717 }
1718 NegativeBlockBehavior::SkipBySize => {
1719 let _ = buf.get_fixed(size_in_bytes)?;
1721 }
1722 }
1723 total += count;
1724 }
1725 Ordering::Greater => {
1726 let count = block_count as usize;
1727 for _ in 0..count {
1728 on_item(buf)?;
1729 }
1730 total += count;
1731 }
1732 }
1733 }
1734 Ok(total)
1735}
1736
1737#[inline]
1738fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
1739 std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
1740}
1741
1742#[inline]
1743fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
1744 std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
1745}
1746
1747#[inline]
1748fn flush_primitive<T: ArrowPrimitiveType>(
1749 values: &mut Vec<T::Native>,
1750 nulls: Option<NullBuffer>,
1751) -> PrimitiveArray<T> {
1752 PrimitiveArray::new(flush_values(values).into(), nulls)
1753}
1754
1755#[inline]
1756fn read_decimal_bytes_be<const N: usize>(
1757 buf: &mut AvroCursor<'_>,
1758 size: &Option<usize>,
1759) -> Result<[u8; N], ArrowError> {
1760 match size {
1761 Some(n) if *n == N => {
1762 let raw = buf.get_fixed(N)?;
1763 let mut arr = [0u8; N];
1764 arr.copy_from_slice(raw);
1765 Ok(arr)
1766 }
1767 Some(n) => {
1768 let raw = buf.get_fixed(*n)?;
1769 sign_cast_to::<N>(raw)
1770 }
1771 None => {
1772 let raw = buf.get_bytes()?;
1773 sign_cast_to::<N>(raw)
1774 }
1775 }
1776}
1777
1778#[inline]
1787fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
1788 let len = raw.len();
1789 if len == N {
1791 let mut out = [0u8; N];
1792 out.copy_from_slice(raw);
1793 return Ok(out);
1794 }
1795 let first = raw.first().copied().unwrap_or(0u8);
1797 let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
1798 let mut out = [sign_byte; N];
1800 if len > N {
1801 let extra = len - N;
1804 if raw[..extra].iter().any(|&b| b != sign_byte) {
1806 return Err(ArrowError::ParseError(format!(
1807 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1808 len, N
1809 )));
1810 }
1811 if N > 0 {
1812 let first_kept = raw[extra];
1813 let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
1814 if sign_bit_mismatch {
1815 return Err(ArrowError::ParseError(format!(
1816 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1817 len, N
1818 )));
1819 }
1820 }
1821 out.copy_from_slice(&raw[extra..]);
1822 return Ok(out);
1823 }
1824 out[N - len..].copy_from_slice(raw);
1825 Ok(out)
1826}
1827
1828#[cfg(feature = "avro_custom_types")]
1829#[inline]
1830fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
1831 match (arr.is_null(i), arr.is_null(j)) {
1832 (true, true) => true,
1833 (true, false) | (false, true) => false,
1834 (false, false) => {
1835 let a = arr.slice(i, 1);
1836 let b = arr.slice(j, 1);
1837 a == b
1838 }
1839 }
1840}
1841
1842#[derive(Debug)]
1843struct Projector {
1844 writer_to_reader: Arc<[Option<usize>]>,
1845 skip_decoders: Vec<Option<Skipper>>,
1846 field_defaults: Vec<Option<AvroLiteral>>,
1847 default_injections: Arc<[(usize, AvroLiteral)]>,
1848}
1849
1850#[derive(Debug)]
1851struct ProjectorBuilder<'a> {
1852 rec: &'a ResolvedRecord,
1853 reader_fields: Arc<[AvroField]>,
1854}
1855
1856impl<'a> ProjectorBuilder<'a> {
1857 #[inline]
1858 fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
1859 Self {
1860 rec,
1861 reader_fields: reader_fields.clone(),
1862 }
1863 }
1864
1865 #[inline]
1866 fn build(self) -> Result<Projector, ArrowError> {
1867 let reader_fields = self.reader_fields;
1868 let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
1869 for avro_field in reader_fields.as_ref() {
1870 if let Some(ResolutionInfo::DefaultValue(lit)) =
1871 avro_field.data_type().resolution.as_ref()
1872 {
1873 field_defaults.push(Some(lit.clone()));
1874 } else {
1875 field_defaults.push(None);
1876 }
1877 }
1878 let mut default_injections: Vec<(usize, AvroLiteral)> =
1879 Vec::with_capacity(self.rec.default_fields.len());
1880 for &idx in self.rec.default_fields.as_ref() {
1881 let lit = field_defaults
1882 .get(idx)
1883 .and_then(|lit| lit.clone())
1884 .unwrap_or(AvroLiteral::Null);
1885 default_injections.push((idx, lit));
1886 }
1887 let mut skip_decoders: Vec<Option<Skipper>> =
1888 Vec::with_capacity(self.rec.skip_fields.len());
1889 for datatype in self.rec.skip_fields.as_ref() {
1890 let skipper = match datatype {
1891 Some(datatype) => Some(Skipper::from_avro(datatype)?),
1892 None => None,
1893 };
1894 skip_decoders.push(skipper);
1895 }
1896 Ok(Projector {
1897 writer_to_reader: self.rec.writer_to_reader.clone(),
1898 skip_decoders,
1899 field_defaults,
1900 default_injections: default_injections.into(),
1901 })
1902 }
1903}
1904
1905impl Projector {
1906 #[inline]
1907 fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), ArrowError> {
1908 if let Some(default_literal) = self.field_defaults[index].as_ref() {
1915 decoder.append_default(default_literal)
1916 } else {
1917 decoder.append_null()
1918 }
1919 }
1920
1921 #[inline]
1922 fn project_record(
1923 &mut self,
1924 buf: &mut AvroCursor<'_>,
1925 encodings: &mut [Decoder],
1926 ) -> Result<(), ArrowError> {
1927 debug_assert_eq!(
1928 self.writer_to_reader.len(),
1929 self.skip_decoders.len(),
1930 "internal invariant: mapping and skipper lists must have equal length"
1931 );
1932 for (i, (mapping, skipper_opt)) in self
1933 .writer_to_reader
1934 .iter()
1935 .zip(self.skip_decoders.iter_mut())
1936 .enumerate()
1937 {
1938 match (mapping, skipper_opt.as_mut()) {
1939 (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
1940 (None, Some(skipper)) => skipper.skip(buf)?,
1941 (None, None) => {
1942 return Err(ArrowError::SchemaError(format!(
1943 "No skipper available for writer-only field at index {i}",
1944 )));
1945 }
1946 }
1947 }
1948 for (reader_index, lit) in self.default_injections.as_ref() {
1949 encodings[*reader_index].append_default(lit)?;
1950 }
1951 Ok(())
1952 }
1953}
1954
1955#[derive(Debug)]
1961enum Skipper {
1962 Null,
1963 Boolean,
1964 Int32,
1965 Int64,
1966 Float32,
1967 Float64,
1968 Bytes,
1969 String,
1970 TimeMicros,
1971 TimestampMillis,
1972 TimestampMicros,
1973 TimestampNanos,
1974 Fixed(usize),
1975 Decimal(Option<usize>),
1976 UuidString,
1977 Enum,
1978 DurationFixed12,
1979 List(Box<Skipper>),
1980 Map(Box<Skipper>),
1981 Struct(Vec<Skipper>),
1982 Union(Vec<Skipper>),
1983 Nullable(Nullability, Box<Skipper>),
1984 #[cfg(feature = "avro_custom_types")]
1985 RunEndEncoded(Box<Skipper>),
1986}
1987
1988impl Skipper {
1989 fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
1990 let mut base = match dt.codec() {
1991 Codec::Null => Self::Null,
1992 Codec::Boolean => Self::Boolean,
1993 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1994 Codec::Int64 => Self::Int64,
1995 Codec::TimeMicros => Self::TimeMicros,
1996 Codec::TimestampMillis(_) => Self::TimestampMillis,
1997 Codec::TimestampMicros(_) => Self::TimestampMicros,
1998 Codec::TimestampNanos(_) => Self::TimestampNanos,
1999 #[cfg(feature = "avro_custom_types")]
2000 Codec::DurationNanos
2001 | Codec::DurationMicros
2002 | Codec::DurationMillis
2003 | Codec::DurationSeconds => Self::Int64,
2004 Codec::Float32 => Self::Float32,
2005 Codec::Float64 => Self::Float64,
2006 Codec::Binary => Self::Bytes,
2007 Codec::Utf8 | Codec::Utf8View => Self::String,
2008 Codec::Fixed(sz) => Self::Fixed(*sz as usize),
2009 Codec::Decimal(_, _, size) => Self::Decimal(*size),
2010 Codec::Uuid => Self::UuidString, Codec::Enum(_) => Self::Enum,
2012 Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2013 Codec::Struct(fields) => Self::Struct(
2014 fields
2015 .iter()
2016 .map(|f| Skipper::from_avro(f.data_type()))
2017 .collect::<Result<_, _>>()?,
2018 ),
2019 Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2020 Codec::Interval => Self::DurationFixed12,
2021 Codec::Union(encodings, _, _) => {
2022 let max_addr = (i32::MAX as usize) + 1;
2023 if encodings.len() > max_addr {
2024 return Err(ArrowError::SchemaError(format!(
2025 "Writer union has {} branches, which exceeds the maximum addressable \
2026 branches by an Avro int tag ({} + 1).",
2027 encodings.len(),
2028 i32::MAX
2029 )));
2030 }
2031 Self::Union(
2032 encodings
2033 .iter()
2034 .map(Skipper::from_avro)
2035 .collect::<Result<_, _>>()?,
2036 )
2037 }
2038 #[cfg(feature = "avro_custom_types")]
2039 Codec::RunEndEncoded(inner, _w) => {
2040 Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2041 }
2042 };
2043 if let Some(n) = dt.nullability() {
2044 base = Self::Nullable(n, Box::new(base));
2045 }
2046 Ok(base)
2047 }
2048
2049 fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
2050 match self {
2051 Self::Null => Ok(()),
2052 Self::Boolean => {
2053 buf.get_bool()?;
2054 Ok(())
2055 }
2056 Self::Int32 => {
2057 buf.get_int()?;
2058 Ok(())
2059 }
2060 Self::Int64
2061 | Self::TimeMicros
2062 | Self::TimestampMillis
2063 | Self::TimestampMicros
2064 | Self::TimestampNanos => {
2065 buf.get_long()?;
2066 Ok(())
2067 }
2068 Self::Float32 => {
2069 buf.get_float()?;
2070 Ok(())
2071 }
2072 Self::Float64 => {
2073 buf.get_double()?;
2074 Ok(())
2075 }
2076 Self::Bytes | Self::String | Self::UuidString => {
2077 buf.get_bytes()?;
2078 Ok(())
2079 }
2080 Self::Fixed(sz) => {
2081 buf.get_fixed(*sz)?;
2082 Ok(())
2083 }
2084 Self::Decimal(size) => {
2085 if let Some(s) = size {
2086 buf.get_fixed(*s)
2087 } else {
2088 buf.get_bytes()
2089 }?;
2090 Ok(())
2091 }
2092 Self::Enum => {
2093 buf.get_int()?;
2094 Ok(())
2095 }
2096 Self::DurationFixed12 => {
2097 buf.get_fixed(12)?;
2098 Ok(())
2099 }
2100 Self::List(item) => {
2101 skip_blocks(buf, |c| item.skip(c))?;
2102 Ok(())
2103 }
2104 Self::Map(value) => {
2105 skip_blocks(buf, |c| {
2106 c.get_bytes()?; value.skip(c)
2108 })?;
2109 Ok(())
2110 }
2111 Self::Struct(fields) => {
2112 for f in fields.iter_mut() {
2113 f.skip(buf)?
2114 }
2115 Ok(())
2116 }
2117 Self::Union(encodings) => {
2118 let raw = buf.get_long()?;
2120 if raw < 0 {
2121 return Err(ArrowError::ParseError(format!(
2122 "Negative union branch index {raw}"
2123 )));
2124 }
2125 let idx: usize = usize::try_from(raw).map_err(|_| {
2126 ArrowError::ParseError(format!(
2127 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2128 (usize::BITS as usize)
2129 ))
2130 })?;
2131 let Some(encoding) = encodings.get_mut(idx) else {
2132 return Err(ArrowError::ParseError(format!(
2133 "Union branch index {idx} out of range for skipper ({} branches)",
2134 encodings.len()
2135 )));
2136 };
2137 encoding.skip(buf)
2138 }
2139 Self::Nullable(order, inner) => {
2140 let branch = buf.read_vlq()?;
2141 let is_not_null = match *order {
2142 Nullability::NullFirst => branch != 0,
2143 Nullability::NullSecond => branch == 0,
2144 };
2145 if is_not_null {
2146 inner.skip(buf)?;
2147 }
2148 Ok(())
2149 }
2150 #[cfg(feature = "avro_custom_types")]
2151 Self::RunEndEncoded(inner) => inner.skip(buf),
2152 }
2153 }
2154}
2155
2156#[cfg(test)]
2157mod tests {
2158 use super::*;
2159 use crate::codec::AvroFieldBuilder;
2160 use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2161 use arrow_array::cast::AsArray;
2162 use indexmap::IndexMap;
2163 use std::collections::HashMap;
2164
2165 fn encode_avro_int(value: i32) -> Vec<u8> {
2166 let mut buf = Vec::new();
2167 let mut v = (value << 1) ^ (value >> 31);
2168 while v & !0x7F != 0 {
2169 buf.push(((v & 0x7F) | 0x80) as u8);
2170 v >>= 7;
2171 }
2172 buf.push(v as u8);
2173 buf
2174 }
2175
2176 fn encode_avro_long(value: i64) -> Vec<u8> {
2177 let mut buf = Vec::new();
2178 let mut v = (value << 1) ^ (value >> 63);
2179 while v & !0x7F != 0 {
2180 buf.push(((v & 0x7F) | 0x80) as u8);
2181 v >>= 7;
2182 }
2183 buf.push(v as u8);
2184 buf
2185 }
2186
2187 fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2188 let mut buf = encode_avro_long(bytes.len() as i64);
2189 buf.extend_from_slice(bytes);
2190 buf
2191 }
2192
2193 fn avro_from_codec(codec: Codec) -> AvroDataType {
2194 AvroDataType::new(codec, Default::default(), None)
2195 }
2196
2197 fn resolved_root_datatype(
2198 writer: Schema<'static>,
2199 reader: Schema<'static>,
2200 use_utf8view: bool,
2201 strict_mode: bool,
2202 ) -> AvroDataType {
2203 let writer_record = Schema::Complex(ComplexType::Record(Record {
2205 name: "Root",
2206 namespace: None,
2207 doc: None,
2208 aliases: vec![],
2209 fields: vec![Field {
2210 name: "v",
2211 r#type: writer,
2212 default: None,
2213 doc: None,
2214 aliases: vec![],
2215 }],
2216 attributes: Attributes::default(),
2217 }));
2218
2219 let reader_record = Schema::Complex(ComplexType::Record(Record {
2221 name: "Root",
2222 namespace: None,
2223 doc: None,
2224 aliases: vec![],
2225 fields: vec![Field {
2226 name: "v",
2227 r#type: reader,
2228 default: None,
2229 doc: None,
2230 aliases: vec![],
2231 }],
2232 attributes: Attributes::default(),
2233 }));
2234
2235 let field = AvroFieldBuilder::new(&writer_record)
2237 .with_reader_schema(&reader_record)
2238 .with_utf8view(use_utf8view)
2239 .with_strict_mode(strict_mode)
2240 .build()
2241 .expect("schema resolution should succeed");
2242
2243 match field.data_type().codec() {
2244 Codec::Struct(fields) => fields[0].data_type().clone(),
2245 other => panic!("expected wrapper struct, got {other:?}"),
2246 }
2247 }
2248
2249 fn decoder_for_promotion(
2250 writer: PrimitiveType,
2251 reader: PrimitiveType,
2252 use_utf8view: bool,
2253 ) -> Decoder {
2254 let ws = Schema::TypeName(TypeName::Primitive(writer));
2255 let rs = Schema::TypeName(TypeName::Primitive(reader));
2256 let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2257 Decoder::try_new(&dt).unwrap()
2258 }
2259
2260 fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2261 AvroDataType::new(codec, HashMap::new(), nullability)
2262 }
2263
2264 #[cfg(feature = "avro_custom_types")]
2265 fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2266 let mut out = Vec::with_capacity(10);
2267 while x >= 0x80 {
2268 out.push((x as u8) | 0x80);
2269 x >>= 7;
2270 }
2271 out.push(x as u8);
2272 out
2273 }
2274
2275 #[test]
2276 fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2277 let ws = Schema::Union(vec![
2278 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2279 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2280 ]);
2281 let rs = Schema::Union(vec![
2282 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2283 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2284 ]);
2285
2286 let dt = resolved_root_datatype(ws, rs, false, false);
2287 let mut dec = Decoder::try_new(&dt).unwrap();
2288
2289 let mut rec1 = encode_avro_long(0);
2290 rec1.extend(encode_avro_int(7));
2291 let mut cur1 = AvroCursor::new(&rec1);
2292 dec.decode(&mut cur1).unwrap();
2293
2294 let mut rec2 = encode_avro_long(1);
2295 rec2.extend(encode_avro_bytes("abc".as_bytes()));
2296 let mut cur2 = AvroCursor::new(&rec2);
2297 dec.decode(&mut cur2).unwrap();
2298
2299 let arr = dec.flush(None).unwrap();
2300 let ua = arr
2301 .as_any()
2302 .downcast_ref::<UnionArray>()
2303 .expect("dense union output");
2304
2305 assert_eq!(
2306 ua.type_id(0),
2307 1,
2308 "first value must select reader 'long' branch"
2309 );
2310 assert_eq!(ua.value_offset(0), 0);
2311
2312 assert_eq!(
2313 ua.type_id(1),
2314 0,
2315 "second value must select reader 'string' branch"
2316 );
2317 assert_eq!(ua.value_offset(1), 0);
2318
2319 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2320 assert_eq!(long_child.len(), 1);
2321 assert_eq!(long_child.value(0), 7);
2322
2323 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2324 assert_eq!(str_child.len(), 1);
2325 assert_eq!(str_child.value(0), "abc");
2326 }
2327
2328 #[test]
2329 fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2330 let ws = Schema::Union(vec![
2331 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2332 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2333 ]);
2334 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2335
2336 let dt = resolved_root_datatype(ws, rs, false, false);
2337 let mut dec = Decoder::try_new(&dt).unwrap();
2338
2339 let mut data = encode_avro_long(0);
2340 data.extend(encode_avro_int(5));
2341 let mut cur = AvroCursor::new(&data);
2342 dec.decode(&mut cur).unwrap();
2343
2344 let arr = dec.flush(None).unwrap();
2345 let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2346 assert_eq!(out.len(), 1);
2347 assert_eq!(out.value(0), 5);
2348 }
2349
2350 #[test]
2351 fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2352 let ws = Schema::Union(vec![
2353 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2354 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2355 ]);
2356 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2357
2358 let dt = resolved_root_datatype(ws, rs, false, false);
2359 let mut dec = Decoder::try_new(&dt).unwrap();
2360
2361 let mut data = encode_avro_long(1);
2362 data.extend(encode_avro_bytes("z".as_bytes()));
2363 let mut cur = AvroCursor::new(&data);
2364 let res = dec.decode(&mut cur);
2365 assert!(
2366 res.is_err(),
2367 "expected error when writer union branch does not resolve to reader non-union type"
2368 );
2369 }
2370
2371 #[test]
2372 fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2373 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2374 let rs = Schema::Union(vec![
2375 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2376 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2377 ]);
2378
2379 let dt = resolved_root_datatype(ws, rs, false, false);
2380 let mut dec = Decoder::try_new(&dt).unwrap();
2381
2382 let data = encode_avro_int(6);
2383 let mut cur = AvroCursor::new(&data);
2384 dec.decode(&mut cur).unwrap();
2385
2386 let arr = dec.flush(None).unwrap();
2387 let ua = arr
2388 .as_any()
2389 .downcast_ref::<UnionArray>()
2390 .expect("dense union output");
2391 assert_eq!(ua.len(), 1);
2392 assert_eq!(
2393 ua.type_id(0),
2394 1,
2395 "must resolve to reader 'long' branch (type_id 1)"
2396 );
2397 assert_eq!(ua.value_offset(0), 0);
2398
2399 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2400 assert_eq!(long_child.len(), 1);
2401 assert_eq!(long_child.value(0), 6);
2402
2403 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2404 assert_eq!(str_child.len(), 0, "string branch must be empty");
2405 }
2406
2407 #[test]
2408 fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2409 let ws = Schema::Union(vec![
2410 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2411 Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2412 ]);
2413 let rs = Schema::Union(vec![
2414 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2415 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2416 ]);
2417
2418 let dt = resolved_root_datatype(ws, rs, false, false);
2419 let mut dec = Decoder::try_new(&dt).unwrap();
2420
2421 let mut data = encode_avro_long(1);
2422 data.push(1);
2423 let mut cur = AvroCursor::new(&data);
2424 let res = dec.decode(&mut cur);
2425 assert!(
2426 res.is_err(),
2427 "expected error for unmapped writer 'boolean' branch"
2428 );
2429 }
2430
2431 #[test]
2432 fn test_schema_resolution_promotion_int_to_long() {
2433 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2434 assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2435 for v in [0, 1, -2, 123456] {
2436 let data = encode_avro_int(v);
2437 let mut cur = AvroCursor::new(&data);
2438 dec.decode(&mut cur).unwrap();
2439 }
2440 let arr = dec.flush(None).unwrap();
2441 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2442 assert_eq!(a.value(0), 0);
2443 assert_eq!(a.value(1), 1);
2444 assert_eq!(a.value(2), -2);
2445 assert_eq!(a.value(3), 123456);
2446 }
2447
2448 #[test]
2449 fn test_schema_resolution_promotion_int_to_float() {
2450 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2451 assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2452 for v in [0, 42, -7] {
2453 let data = encode_avro_int(v);
2454 let mut cur = AvroCursor::new(&data);
2455 dec.decode(&mut cur).unwrap();
2456 }
2457 let arr = dec.flush(None).unwrap();
2458 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2459 assert_eq!(a.value(0), 0.0);
2460 assert_eq!(a.value(1), 42.0);
2461 assert_eq!(a.value(2), -7.0);
2462 }
2463
2464 #[test]
2465 fn test_schema_resolution_promotion_int_to_double() {
2466 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2467 assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2468 for v in [1, -1, 10_000] {
2469 let data = encode_avro_int(v);
2470 let mut cur = AvroCursor::new(&data);
2471 dec.decode(&mut cur).unwrap();
2472 }
2473 let arr = dec.flush(None).unwrap();
2474 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2475 assert_eq!(a.value(0), 1.0);
2476 assert_eq!(a.value(1), -1.0);
2477 assert_eq!(a.value(2), 10_000.0);
2478 }
2479
2480 #[test]
2481 fn test_schema_resolution_promotion_long_to_float() {
2482 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2483 assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2484 for v in [0_i64, 1_000_000_i64, -123_i64] {
2485 let data = encode_avro_long(v);
2486 let mut cur = AvroCursor::new(&data);
2487 dec.decode(&mut cur).unwrap();
2488 }
2489 let arr = dec.flush(None).unwrap();
2490 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2491 assert_eq!(a.value(0), 0.0);
2492 assert_eq!(a.value(1), 1_000_000.0);
2493 assert_eq!(a.value(2), -123.0);
2494 }
2495
2496 #[test]
2497 fn test_schema_resolution_promotion_long_to_double() {
2498 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2499 assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2500 for v in [2_i64, -2_i64, 9_223_372_i64] {
2501 let data = encode_avro_long(v);
2502 let mut cur = AvroCursor::new(&data);
2503 dec.decode(&mut cur).unwrap();
2504 }
2505 let arr = dec.flush(None).unwrap();
2506 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2507 assert_eq!(a.value(0), 2.0);
2508 assert_eq!(a.value(1), -2.0);
2509 assert_eq!(a.value(2), 9_223_372.0);
2510 }
2511
2512 #[test]
2513 fn test_schema_resolution_promotion_float_to_double() {
2514 let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2515 assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2516 for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2517 let data = v.to_le_bytes().to_vec();
2518 let mut cur = AvroCursor::new(&data);
2519 dec.decode(&mut cur).unwrap();
2520 }
2521 let arr = dec.flush(None).unwrap();
2522 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2523 assert_eq!(a.value(0), 0.5_f64);
2524 assert_eq!(a.value(1), -3.25_f64);
2525 assert_eq!(a.value(2), 1.0e6_f64);
2526 }
2527
2528 #[test]
2529 fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2530 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2531 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2532 for s in ["hello", "world", "héllo"] {
2533 let data = encode_avro_bytes(s.as_bytes());
2534 let mut cur = AvroCursor::new(&data);
2535 dec.decode(&mut cur).unwrap();
2536 }
2537 let arr = dec.flush(None).unwrap();
2538 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2539 assert_eq!(a.value(0), "hello");
2540 assert_eq!(a.value(1), "world");
2541 assert_eq!(a.value(2), "héllo");
2542 }
2543
2544 #[test]
2545 fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2546 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2547 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2548 let data = encode_avro_bytes("abc".as_bytes());
2549 let mut cur = AvroCursor::new(&data);
2550 dec.decode(&mut cur).unwrap();
2551 let arr = dec.flush(None).unwrap();
2552 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2553 assert_eq!(a.value(0), "abc");
2554 }
2555
2556 #[test]
2557 fn test_schema_resolution_promotion_string_to_bytes() {
2558 let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2559 assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2560 for s in ["", "abc", "data"] {
2561 let data = encode_avro_bytes(s.as_bytes());
2562 let mut cur = AvroCursor::new(&data);
2563 dec.decode(&mut cur).unwrap();
2564 }
2565 let arr = dec.flush(None).unwrap();
2566 let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2567 assert_eq!(a.value(0), b"");
2568 assert_eq!(a.value(1), b"abc");
2569 assert_eq!(a.value(2), "data".as_bytes());
2570 }
2571
2572 #[test]
2573 fn test_schema_resolution_no_promotion_passthrough_int() {
2574 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2575 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2576 let writer_record = Schema::Complex(ComplexType::Record(Record {
2578 name: "Root",
2579 namespace: None,
2580 doc: None,
2581 aliases: vec![],
2582 fields: vec![Field {
2583 name: "v",
2584 r#type: ws,
2585 default: None,
2586 doc: None,
2587 aliases: vec![],
2588 }],
2589 attributes: Attributes::default(),
2590 }));
2591 let reader_record = Schema::Complex(ComplexType::Record(Record {
2592 name: "Root",
2593 namespace: None,
2594 doc: None,
2595 aliases: vec![],
2596 fields: vec![Field {
2597 name: "v",
2598 r#type: rs,
2599 default: None,
2600 doc: None,
2601 aliases: vec![],
2602 }],
2603 attributes: Attributes::default(),
2604 }));
2605 let field = AvroFieldBuilder::new(&writer_record)
2606 .with_reader_schema(&reader_record)
2607 .with_utf8view(false)
2608 .with_strict_mode(false)
2609 .build()
2610 .unwrap();
2611 let dt = match field.data_type().codec() {
2613 Codec::Struct(fields) => fields[0].data_type().clone(),
2614 other => panic!("expected wrapper struct, got {other:?}"),
2615 };
2616 let mut dec = Decoder::try_new(&dt).unwrap();
2617 assert!(matches!(dec, Decoder::Int32(_)));
2618 for v in [7, -9] {
2619 let data = encode_avro_int(v);
2620 let mut cur = AvroCursor::new(&data);
2621 dec.decode(&mut cur).unwrap();
2622 }
2623 let arr = dec.flush(None).unwrap();
2624 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2625 assert_eq!(a.value(0), 7);
2626 assert_eq!(a.value(1), -9);
2627 }
2628
2629 #[test]
2630 fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
2631 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2632 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
2633 let writer_record = Schema::Complex(ComplexType::Record(Record {
2634 name: "Root",
2635 namespace: None,
2636 doc: None,
2637 aliases: vec![],
2638 fields: vec![Field {
2639 name: "v",
2640 r#type: ws,
2641 default: None,
2642 doc: None,
2643 aliases: vec![],
2644 }],
2645 attributes: Attributes::default(),
2646 }));
2647 let reader_record = Schema::Complex(ComplexType::Record(Record {
2648 name: "Root",
2649 namespace: None,
2650 doc: None,
2651 aliases: vec![],
2652 fields: vec![Field {
2653 name: "v",
2654 r#type: rs,
2655 default: None,
2656 doc: None,
2657 aliases: vec![],
2658 }],
2659 attributes: Attributes::default(),
2660 }));
2661 let res = AvroFieldBuilder::new(&writer_record)
2662 .with_reader_schema(&reader_record)
2663 .with_utf8view(false)
2664 .with_strict_mode(false)
2665 .build();
2666 assert!(res.is_err(), "expected error for illegal promotion");
2667 }
2668
2669 #[test]
2670 fn test_map_decoding_one_entry() {
2671 let value_type = avro_from_codec(Codec::Utf8);
2672 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2673 let mut decoder = Decoder::try_new(&map_type).unwrap();
2674 let mut data = Vec::new();
2676 data.extend_from_slice(&encode_avro_long(1));
2677 data.extend_from_slice(&encode_avro_bytes(b"hello")); data.extend_from_slice(&encode_avro_bytes(b"world")); data.extend_from_slice(&encode_avro_long(0));
2680 let mut cursor = AvroCursor::new(&data);
2681 decoder.decode(&mut cursor).unwrap();
2682 let array = decoder.flush(None).unwrap();
2683 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2684 assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 1);
2686 let entries = map_arr.value(0);
2687 let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
2688 assert_eq!(struct_entries.len(), 1);
2689 let key_arr = struct_entries
2690 .column_by_name("key")
2691 .unwrap()
2692 .as_any()
2693 .downcast_ref::<StringArray>()
2694 .unwrap();
2695 let val_arr = struct_entries
2696 .column_by_name("value")
2697 .unwrap()
2698 .as_any()
2699 .downcast_ref::<StringArray>()
2700 .unwrap();
2701 assert_eq!(key_arr.value(0), "hello");
2702 assert_eq!(val_arr.value(0), "world");
2703 }
2704
2705 #[test]
2706 fn test_map_decoding_empty() {
2707 let value_type = avro_from_codec(Codec::Utf8);
2708 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2709 let mut decoder = Decoder::try_new(&map_type).unwrap();
2710 let data = encode_avro_long(0);
2711 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2712 let array = decoder.flush(None).unwrap();
2713 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2714 assert_eq!(map_arr.len(), 1);
2715 assert_eq!(map_arr.value_length(0), 0);
2716 }
2717
2718 #[test]
2719 fn test_fixed_decoding() {
2720 let avro_type = avro_from_codec(Codec::Fixed(3));
2721 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2722
2723 let data1 = [1u8, 2, 3];
2724 let mut cursor1 = AvroCursor::new(&data1);
2725 decoder
2726 .decode(&mut cursor1)
2727 .expect("Failed to decode data1");
2728 assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
2729 let data2 = [4u8, 5, 6];
2730 let mut cursor2 = AvroCursor::new(&data2);
2731 decoder
2732 .decode(&mut cursor2)
2733 .expect("Failed to decode data2");
2734 assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
2735 let array = decoder.flush(None).expect("Failed to flush decoder");
2736 assert_eq!(array.len(), 2, "Array should contain two items");
2737 let fixed_size_binary_array = array
2738 .as_any()
2739 .downcast_ref::<FixedSizeBinaryArray>()
2740 .expect("Failed to downcast to FixedSizeBinaryArray");
2741 assert_eq!(
2742 fixed_size_binary_array.value_length(),
2743 3,
2744 "Fixed size of binary values should be 3"
2745 );
2746 assert_eq!(
2747 fixed_size_binary_array.value(0),
2748 &[1, 2, 3],
2749 "First item mismatch"
2750 );
2751 assert_eq!(
2752 fixed_size_binary_array.value(1),
2753 &[4, 5, 6],
2754 "Second item mismatch"
2755 );
2756 }
2757
2758 #[test]
2759 fn test_fixed_decoding_empty() {
2760 let avro_type = avro_from_codec(Codec::Fixed(5));
2761 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2762
2763 let array = decoder
2764 .flush(None)
2765 .expect("Failed to flush decoder for empty input");
2766
2767 assert_eq!(array.len(), 0, "Array should be empty");
2768 let fixed_size_binary_array = array
2769 .as_any()
2770 .downcast_ref::<FixedSizeBinaryArray>()
2771 .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
2772
2773 assert_eq!(
2774 fixed_size_binary_array.value_length(),
2775 5,
2776 "Fixed size of binary values should be 5 as per type"
2777 );
2778 }
2779
2780 #[test]
2781 fn test_uuid_decoding() {
2782 let avro_type = avro_from_codec(Codec::Uuid);
2783 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2784 let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
2785 let data = encode_avro_bytes(uuid_str.as_bytes());
2786 let mut cursor = AvroCursor::new(&data);
2787 decoder.decode(&mut cursor).expect("Failed to decode data");
2788 assert_eq!(
2789 cursor.position(),
2790 data.len(),
2791 "Cursor should advance by varint size + data size"
2792 );
2793 let array = decoder.flush(None).expect("Failed to flush decoder");
2794 let fixed_size_binary_array = array
2795 .as_any()
2796 .downcast_ref::<FixedSizeBinaryArray>()
2797 .expect("Array should be a FixedSizeBinaryArray");
2798 assert_eq!(fixed_size_binary_array.len(), 1);
2799 assert_eq!(fixed_size_binary_array.value_length(), 16);
2800 let expected_bytes = [
2801 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
2802 0x6b, 0xf6,
2803 ];
2804 assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
2805 }
2806
2807 #[test]
2808 fn test_array_decoding() {
2809 let item_dt = avro_from_codec(Codec::Int32);
2810 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2811 let mut decoder = Decoder::try_new(&list_dt).unwrap();
2812 let mut row1 = Vec::new();
2813 row1.extend_from_slice(&encode_avro_long(2));
2814 row1.extend_from_slice(&encode_avro_int(10));
2815 row1.extend_from_slice(&encode_avro_int(20));
2816 row1.extend_from_slice(&encode_avro_long(0));
2817 let row2 = encode_avro_long(0);
2818 let mut cursor = AvroCursor::new(&row1);
2819 decoder.decode(&mut cursor).unwrap();
2820 let mut cursor2 = AvroCursor::new(&row2);
2821 decoder.decode(&mut cursor2).unwrap();
2822 let array = decoder.flush(None).unwrap();
2823 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2824 assert_eq!(list_arr.len(), 2);
2825 let offsets = list_arr.value_offsets();
2826 assert_eq!(offsets, &[0, 2, 2]);
2827 let values = list_arr.values();
2828 let int_arr = values.as_primitive::<Int32Type>();
2829 assert_eq!(int_arr.len(), 2);
2830 assert_eq!(int_arr.value(0), 10);
2831 assert_eq!(int_arr.value(1), 20);
2832 }
2833
2834 #[test]
2835 fn test_array_decoding_with_negative_block_count() {
2836 let item_dt = avro_from_codec(Codec::Int32);
2837 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2838 let mut decoder = Decoder::try_new(&list_dt).unwrap();
2839 let mut data = encode_avro_long(-3);
2840 data.extend_from_slice(&encode_avro_long(12));
2841 data.extend_from_slice(&encode_avro_int(1));
2842 data.extend_from_slice(&encode_avro_int(2));
2843 data.extend_from_slice(&encode_avro_int(3));
2844 data.extend_from_slice(&encode_avro_long(0));
2845 let mut cursor = AvroCursor::new(&data);
2846 decoder.decode(&mut cursor).unwrap();
2847 let array = decoder.flush(None).unwrap();
2848 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2849 assert_eq!(list_arr.len(), 1);
2850 assert_eq!(list_arr.value_length(0), 3);
2851 let values = list_arr.values().as_primitive::<Int32Type>();
2852 assert_eq!(values.len(), 3);
2853 assert_eq!(values.value(0), 1);
2854 assert_eq!(values.value(1), 2);
2855 assert_eq!(values.value(2), 3);
2856 }
2857
2858 #[test]
2859 fn test_nested_array_decoding() {
2860 let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
2861 let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
2862 let mut decoder = Decoder::try_new(&nested_ty).unwrap();
2863 let mut buf = Vec::new();
2864 buf.extend(encode_avro_long(1));
2865 buf.extend(encode_avro_long(2));
2866 buf.extend(encode_avro_int(5));
2867 buf.extend(encode_avro_int(6));
2868 buf.extend(encode_avro_long(0));
2869 buf.extend(encode_avro_long(0));
2870 let mut cursor = AvroCursor::new(&buf);
2871 decoder.decode(&mut cursor).unwrap();
2872 let arr = decoder.flush(None).unwrap();
2873 let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
2874 assert_eq!(outer.len(), 1);
2875 assert_eq!(outer.value_length(0), 1);
2876 let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
2877 assert_eq!(inner.len(), 1);
2878 assert_eq!(inner.value_length(0), 2);
2879 let values = inner
2880 .values()
2881 .as_any()
2882 .downcast_ref::<Int32Array>()
2883 .unwrap();
2884 assert_eq!(values.values(), &[5, 6]);
2885 }
2886
2887 #[test]
2888 fn test_array_decoding_empty_array() {
2889 let value_type = avro_from_codec(Codec::Utf8);
2890 let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
2891 let mut decoder = Decoder::try_new(&map_type).unwrap();
2892 let data = encode_avro_long(0);
2893 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2894 let array = decoder.flush(None).unwrap();
2895 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2896 assert_eq!(list_arr.len(), 1);
2897 assert_eq!(list_arr.value_length(0), 0);
2898 }
2899
2900 #[test]
2901 fn test_decimal_decoding_fixed256() {
2902 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
2903 let mut decoder = Decoder::try_new(&dt).unwrap();
2904 let row1 = [
2905 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2906 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2907 0x00, 0x00, 0x30, 0x39,
2908 ];
2909 let row2 = [
2910 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2911 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2912 0xFF, 0xFF, 0xFF, 0x85,
2913 ];
2914 let mut data = Vec::new();
2915 data.extend_from_slice(&row1);
2916 data.extend_from_slice(&row2);
2917 let mut cursor = AvroCursor::new(&data);
2918 decoder.decode(&mut cursor).unwrap();
2919 decoder.decode(&mut cursor).unwrap();
2920 let arr = decoder.flush(None).unwrap();
2921 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
2922 assert_eq!(dec.len(), 2);
2923 assert_eq!(dec.value_as_string(0), "123.45");
2924 assert_eq!(dec.value_as_string(1), "-1.23");
2925 }
2926
2927 #[test]
2928 fn test_decimal_decoding_fixed128() {
2929 let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
2930 let mut decoder = Decoder::try_new(&dt).unwrap();
2931 let row1 = [
2932 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2933 0x30, 0x39,
2934 ];
2935 let row2 = [
2936 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2937 0xFF, 0x85,
2938 ];
2939 let mut data = Vec::new();
2940 data.extend_from_slice(&row1);
2941 data.extend_from_slice(&row2);
2942 let mut cursor = AvroCursor::new(&data);
2943 decoder.decode(&mut cursor).unwrap();
2944 decoder.decode(&mut cursor).unwrap();
2945 let arr = decoder.flush(None).unwrap();
2946 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2947 assert_eq!(dec.len(), 2);
2948 assert_eq!(dec.value_as_string(0), "123.45");
2949 assert_eq!(dec.value_as_string(1), "-1.23");
2950 }
2951
2952 #[test]
2953 fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
2954 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
2955 let mut decoder = Decoder::try_new(&dt).unwrap();
2956 let row1 = [
2957 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2958 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2959 0x00, 0x00, 0x30, 0x39,
2960 ];
2961 let row2 = [
2962 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2963 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2964 0xFF, 0xFF, 0xFF, 0x85,
2965 ];
2966 let mut data = Vec::new();
2967 data.extend_from_slice(&row1);
2968 data.extend_from_slice(&row2);
2969 let mut cursor = AvroCursor::new(&data);
2970 decoder.decode(&mut cursor).unwrap();
2971 decoder.decode(&mut cursor).unwrap();
2972 let arr = decoder.flush(None).unwrap();
2973 #[cfg(feature = "small_decimals")]
2974 {
2975 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2976 assert_eq!(dec.len(), 2);
2977 assert_eq!(dec.value_as_string(0), "123.45");
2978 assert_eq!(dec.value_as_string(1), "-1.23");
2979 }
2980 #[cfg(not(feature = "small_decimals"))]
2981 {
2982 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2983 assert_eq!(dec.len(), 2);
2984 assert_eq!(dec.value_as_string(0), "123.45");
2985 assert_eq!(dec.value_as_string(1), "-1.23");
2986 }
2987 }
2988
2989 #[test]
2990 fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
2991 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
2992 let mut decoder = Decoder::try_new(&dt).unwrap();
2993 let row1 = [
2994 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2995 0x30, 0x39,
2996 ];
2997 let row2 = [
2998 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2999 0xFF, 0x85,
3000 ];
3001 let mut data = Vec::new();
3002 data.extend_from_slice(&row1);
3003 data.extend_from_slice(&row2);
3004 let mut cursor = AvroCursor::new(&data);
3005 decoder.decode(&mut cursor).unwrap();
3006 decoder.decode(&mut cursor).unwrap();
3007
3008 let arr = decoder.flush(None).unwrap();
3009 #[cfg(feature = "small_decimals")]
3010 {
3011 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3012 assert_eq!(dec.len(), 2);
3013 assert_eq!(dec.value_as_string(0), "123.45");
3014 assert_eq!(dec.value_as_string(1), "-1.23");
3015 }
3016 #[cfg(not(feature = "small_decimals"))]
3017 {
3018 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3019 assert_eq!(dec.len(), 2);
3020 assert_eq!(dec.value_as_string(0), "123.45");
3021 assert_eq!(dec.value_as_string(1), "-1.23");
3022 }
3023 }
3024
3025 #[test]
3026 fn test_decimal_decoding_bytes_with_nulls() {
3027 let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3028 let inner = Decoder::try_new(&dt).unwrap();
3029 let mut decoder = Decoder::Nullable(
3030 Nullability::NullSecond,
3031 NullBufferBuilder::new(DEFAULT_CAPACITY),
3032 Box::new(inner),
3033 NullablePlan::ReadTag,
3034 );
3035 let mut data = Vec::new();
3036 data.extend_from_slice(&encode_avro_int(0));
3037 data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3038 data.extend_from_slice(&encode_avro_int(1));
3039 data.extend_from_slice(&encode_avro_int(0));
3040 data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3041 let mut cursor = AvroCursor::new(&data);
3042 decoder.decode(&mut cursor).unwrap();
3043 decoder.decode(&mut cursor).unwrap();
3044 decoder.decode(&mut cursor).unwrap();
3045 let arr = decoder.flush(None).unwrap();
3046 #[cfg(feature = "small_decimals")]
3047 {
3048 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3049 assert_eq!(dec_arr.len(), 3);
3050 assert!(dec_arr.is_valid(0));
3051 assert!(!dec_arr.is_valid(1));
3052 assert!(dec_arr.is_valid(2));
3053 assert_eq!(dec_arr.value_as_string(0), "123.4");
3054 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3055 }
3056 #[cfg(not(feature = "small_decimals"))]
3057 {
3058 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3059 assert_eq!(dec_arr.len(), 3);
3060 assert!(dec_arr.is_valid(0));
3061 assert!(!dec_arr.is_valid(1));
3062 assert!(dec_arr.is_valid(2));
3063 assert_eq!(dec_arr.value_as_string(0), "123.4");
3064 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3065 }
3066 }
3067
3068 #[test]
3069 fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3070 let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3071 let inner = Decoder::try_new(&dt).unwrap();
3072 let mut decoder = Decoder::Nullable(
3073 Nullability::NullSecond,
3074 NullBufferBuilder::new(DEFAULT_CAPACITY),
3075 Box::new(inner),
3076 NullablePlan::ReadTag,
3077 );
3078 let row1 = [
3079 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3080 0xE2, 0x40,
3081 ];
3082 let row3 = [
3083 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3084 0x1D, 0xC0,
3085 ];
3086 let mut data = Vec::new();
3087 data.extend_from_slice(&encode_avro_int(0));
3088 data.extend_from_slice(&row1);
3089 data.extend_from_slice(&encode_avro_int(1));
3090 data.extend_from_slice(&encode_avro_int(0));
3091 data.extend_from_slice(&row3);
3092 let mut cursor = AvroCursor::new(&data);
3093 decoder.decode(&mut cursor).unwrap();
3094 decoder.decode(&mut cursor).unwrap();
3095 decoder.decode(&mut cursor).unwrap();
3096 let arr = decoder.flush(None).unwrap();
3097 #[cfg(feature = "small_decimals")]
3098 {
3099 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3100 assert_eq!(dec_arr.len(), 3);
3101 assert!(dec_arr.is_valid(0));
3102 assert!(!dec_arr.is_valid(1));
3103 assert!(dec_arr.is_valid(2));
3104 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3105 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3106 }
3107 #[cfg(not(feature = "small_decimals"))]
3108 {
3109 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3110 assert_eq!(dec_arr.len(), 3);
3111 assert!(dec_arr.is_valid(0));
3112 assert!(!dec_arr.is_valid(1));
3113 assert!(dec_arr.is_valid(2));
3114 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3115 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3116 }
3117 }
3118
3119 #[test]
3120 fn test_enum_decoding() {
3121 let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3122 let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3123 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3124 let mut data = Vec::new();
3125 data.extend_from_slice(&encode_avro_int(2));
3126 data.extend_from_slice(&encode_avro_int(0));
3127 data.extend_from_slice(&encode_avro_int(1));
3128 let mut cursor = AvroCursor::new(&data);
3129 decoder.decode(&mut cursor).unwrap();
3130 decoder.decode(&mut cursor).unwrap();
3131 decoder.decode(&mut cursor).unwrap();
3132 let array = decoder.flush(None).unwrap();
3133 let dict_array = array
3134 .as_any()
3135 .downcast_ref::<DictionaryArray<Int32Type>>()
3136 .unwrap();
3137 assert_eq!(dict_array.len(), 3);
3138 let values = dict_array
3139 .values()
3140 .as_any()
3141 .downcast_ref::<StringArray>()
3142 .unwrap();
3143 assert_eq!(values.value(0), "A");
3144 assert_eq!(values.value(1), "B");
3145 assert_eq!(values.value(2), "C");
3146 assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3147 }
3148
3149 #[test]
3150 fn test_enum_decoding_with_nulls() {
3151 let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3152 let enum_codec = Codec::Enum(symbols.clone());
3153 let avro_type =
3154 AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3155 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3156 let mut data = Vec::new();
3157 data.extend_from_slice(&encode_avro_long(1));
3158 data.extend_from_slice(&encode_avro_int(1));
3159 data.extend_from_slice(&encode_avro_long(0));
3160 data.extend_from_slice(&encode_avro_long(1));
3161 data.extend_from_slice(&encode_avro_int(0));
3162 let mut cursor = AvroCursor::new(&data);
3163 decoder.decode(&mut cursor).unwrap();
3164 decoder.decode(&mut cursor).unwrap();
3165 decoder.decode(&mut cursor).unwrap();
3166 let array = decoder.flush(None).unwrap();
3167 let dict_array = array
3168 .as_any()
3169 .downcast_ref::<DictionaryArray<Int32Type>>()
3170 .unwrap();
3171 assert_eq!(dict_array.len(), 3);
3172 assert!(dict_array.is_valid(0));
3173 assert!(dict_array.is_null(1));
3174 assert!(dict_array.is_valid(2));
3175 let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3176 assert_eq!(dict_array.keys(), &expected_keys);
3177 let values = dict_array
3178 .values()
3179 .as_any()
3180 .downcast_ref::<StringArray>()
3181 .unwrap();
3182 assert_eq!(values.value(0), "X");
3183 assert_eq!(values.value(1), "Y");
3184 }
3185
3186 #[test]
3187 fn test_duration_decoding_with_nulls() {
3188 let duration_codec = Codec::Interval;
3189 let avro_type = AvroDataType::new(
3190 duration_codec,
3191 Default::default(),
3192 Some(Nullability::NullFirst),
3193 );
3194 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3195 let mut data = Vec::new();
3196 data.extend_from_slice(&encode_avro_long(1)); let mut duration1 = Vec::new();
3199 duration1.extend_from_slice(&1u32.to_le_bytes());
3200 duration1.extend_from_slice(&2u32.to_le_bytes());
3201 duration1.extend_from_slice(&3u32.to_le_bytes());
3202 data.extend_from_slice(&duration1);
3203 data.extend_from_slice(&encode_avro_long(0)); data.extend_from_slice(&encode_avro_long(1)); let mut duration2 = Vec::new();
3207 duration2.extend_from_slice(&4u32.to_le_bytes());
3208 duration2.extend_from_slice(&5u32.to_le_bytes());
3209 duration2.extend_from_slice(&6u32.to_le_bytes());
3210 data.extend_from_slice(&duration2);
3211 let mut cursor = AvroCursor::new(&data);
3212 decoder.decode(&mut cursor).unwrap();
3213 decoder.decode(&mut cursor).unwrap();
3214 decoder.decode(&mut cursor).unwrap();
3215 let array = decoder.flush(None).unwrap();
3216 let interval_array = array
3217 .as_any()
3218 .downcast_ref::<IntervalMonthDayNanoArray>()
3219 .unwrap();
3220 assert_eq!(interval_array.len(), 3);
3221 assert!(interval_array.is_valid(0));
3222 assert!(interval_array.is_null(1));
3223 assert!(interval_array.is_valid(2));
3224 let expected = IntervalMonthDayNanoArray::from(vec![
3225 Some(IntervalMonthDayNano {
3226 months: 1,
3227 days: 2,
3228 nanoseconds: 3_000_000,
3229 }),
3230 None,
3231 Some(IntervalMonthDayNano {
3232 months: 4,
3233 days: 5,
3234 nanoseconds: 6_000_000,
3235 }),
3236 ]);
3237 assert_eq!(interval_array, &expected);
3238 }
3239
3240 #[test]
3241 fn test_duration_decoding_empty() {
3242 let duration_codec = Codec::Interval;
3243 let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3244 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3245 let array = decoder.flush(None).unwrap();
3246 assert_eq!(array.len(), 0);
3247 }
3248
3249 #[test]
3250 #[cfg(feature = "avro_custom_types")]
3251 fn test_duration_seconds_decoding() {
3252 let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3253 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3254 let mut data = Vec::new();
3255 data.extend_from_slice(&encode_avro_long(0));
3257 data.extend_from_slice(&encode_avro_long(-1));
3258 data.extend_from_slice(&encode_avro_long(2));
3259 let mut cursor = AvroCursor::new(&data);
3260 decoder.decode(&mut cursor).unwrap();
3261 decoder.decode(&mut cursor).unwrap();
3262 decoder.decode(&mut cursor).unwrap();
3263 let array = decoder.flush(None).unwrap();
3264 let dur = array
3265 .as_any()
3266 .downcast_ref::<DurationSecondArray>()
3267 .unwrap();
3268 assert_eq!(dur.values(), &[0, -1, 2]);
3269 }
3270
3271 #[test]
3272 #[cfg(feature = "avro_custom_types")]
3273 fn test_duration_milliseconds_decoding() {
3274 let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3275 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3276 let mut data = Vec::new();
3277 for v in [1i64, 0, -2] {
3278 data.extend_from_slice(&encode_avro_long(v));
3279 }
3280 let mut cursor = AvroCursor::new(&data);
3281 for _ in 0..3 {
3282 decoder.decode(&mut cursor).unwrap();
3283 }
3284 let array = decoder.flush(None).unwrap();
3285 let dur = array
3286 .as_any()
3287 .downcast_ref::<DurationMillisecondArray>()
3288 .unwrap();
3289 assert_eq!(dur.values(), &[1, 0, -2]);
3290 }
3291
3292 #[test]
3293 #[cfg(feature = "avro_custom_types")]
3294 fn test_duration_microseconds_decoding() {
3295 let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3296 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3297 let mut data = Vec::new();
3298 for v in [5i64, -6, 7] {
3299 data.extend_from_slice(&encode_avro_long(v));
3300 }
3301 let mut cursor = AvroCursor::new(&data);
3302 for _ in 0..3 {
3303 decoder.decode(&mut cursor).unwrap();
3304 }
3305 let array = decoder.flush(None).unwrap();
3306 let dur = array
3307 .as_any()
3308 .downcast_ref::<DurationMicrosecondArray>()
3309 .unwrap();
3310 assert_eq!(dur.values(), &[5, -6, 7]);
3311 }
3312
3313 #[test]
3314 #[cfg(feature = "avro_custom_types")]
3315 fn test_duration_nanoseconds_decoding() {
3316 let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3317 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3318 let mut data = Vec::new();
3319 for v in [8i64, 9, -10] {
3320 data.extend_from_slice(&encode_avro_long(v));
3321 }
3322 let mut cursor = AvroCursor::new(&data);
3323 for _ in 0..3 {
3324 decoder.decode(&mut cursor).unwrap();
3325 }
3326 let array = decoder.flush(None).unwrap();
3327 let dur = array
3328 .as_any()
3329 .downcast_ref::<DurationNanosecondArray>()
3330 .unwrap();
3331 assert_eq!(dur.values(), &[8, 9, -10]);
3332 }
3333
3334 #[test]
3335 fn test_nullable_decode_error_bitmap_corruption() {
3336 let avro_type = AvroDataType::new(
3338 Codec::Int32,
3339 Default::default(),
3340 Some(Nullability::NullSecond),
3341 );
3342 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3343
3344 let mut row1 = Vec::new();
3346 row1.extend_from_slice(&encode_avro_int(1));
3347
3348 let mut row2 = Vec::new();
3350 row2.extend_from_slice(&encode_avro_int(0)); let mut row3 = Vec::new();
3354 row3.extend_from_slice(&encode_avro_int(0)); row3.extend_from_slice(&encode_avro_int(42)); decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3358 assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3360
3361 let array = decoder.flush(None).unwrap();
3362
3363 assert_eq!(array.len(), 2);
3365 let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3366 assert!(int_array.is_null(0)); assert_eq!(int_array.value(1), 42); }
3369
3370 #[test]
3371 fn test_enum_mapping_reordered_symbols() {
3372 let reader_symbols: Arc<[String]> =
3373 vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3374 let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3375 let default_index: i32 = -1;
3376 let mut dec = Decoder::Enum(
3377 Vec::with_capacity(DEFAULT_CAPACITY),
3378 reader_symbols.clone(),
3379 Some(EnumResolution {
3380 mapping,
3381 default_index,
3382 }),
3383 );
3384 let mut data = Vec::new();
3385 data.extend_from_slice(&encode_avro_int(0));
3386 data.extend_from_slice(&encode_avro_int(1));
3387 data.extend_from_slice(&encode_avro_int(2));
3388 let mut cur = AvroCursor::new(&data);
3389 dec.decode(&mut cur).unwrap();
3390 dec.decode(&mut cur).unwrap();
3391 dec.decode(&mut cur).unwrap();
3392 let arr = dec.flush(None).unwrap();
3393 let dict = arr
3394 .as_any()
3395 .downcast_ref::<DictionaryArray<Int32Type>>()
3396 .unwrap();
3397 let expected_keys = Int32Array::from(vec![2, 0, 1]);
3398 assert_eq!(dict.keys(), &expected_keys);
3399 let values = dict
3400 .values()
3401 .as_any()
3402 .downcast_ref::<StringArray>()
3403 .unwrap();
3404 assert_eq!(values.value(0), "B");
3405 assert_eq!(values.value(1), "C");
3406 assert_eq!(values.value(2), "A");
3407 }
3408
3409 #[test]
3410 fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3411 let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3412 let default_index: i32 = 1;
3413 let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3414 let mut dec = Decoder::Enum(
3415 Vec::with_capacity(DEFAULT_CAPACITY),
3416 reader_symbols.clone(),
3417 Some(EnumResolution {
3418 mapping,
3419 default_index,
3420 }),
3421 );
3422 let mut data = Vec::new();
3423 data.extend_from_slice(&encode_avro_int(0));
3424 data.extend_from_slice(&encode_avro_int(1));
3425 data.extend_from_slice(&encode_avro_int(99));
3426 let mut cur = AvroCursor::new(&data);
3427 dec.decode(&mut cur).unwrap();
3428 dec.decode(&mut cur).unwrap();
3429 dec.decode(&mut cur).unwrap();
3430 let arr = dec.flush(None).unwrap();
3431 let dict = arr
3432 .as_any()
3433 .downcast_ref::<DictionaryArray<Int32Type>>()
3434 .unwrap();
3435 let expected_keys = Int32Array::from(vec![0, 1, 1]);
3436 assert_eq!(dict.keys(), &expected_keys);
3437 let values = dict
3438 .values()
3439 .as_any()
3440 .downcast_ref::<StringArray>()
3441 .unwrap();
3442 assert_eq!(values.value(0), "A");
3443 assert_eq!(values.value(1), "B");
3444 }
3445
3446 #[test]
3447 fn test_enum_mapping_unknown_symbol_without_default_errors() {
3448 let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3449 let default_index: i32 = -1; let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3451 let mut dec = Decoder::Enum(
3452 Vec::with_capacity(DEFAULT_CAPACITY),
3453 reader_symbols,
3454 Some(EnumResolution {
3455 mapping,
3456 default_index,
3457 }),
3458 );
3459 let data = encode_avro_int(0);
3460 let mut cur = AvroCursor::new(&data);
3461 let err = dec
3462 .decode(&mut cur)
3463 .expect_err("expected decode error for unresolved enum without default");
3464 let msg = err.to_string();
3465 assert!(
3466 msg.contains("not resolvable") && msg.contains("no default"),
3467 "unexpected error message: {msg}"
3468 );
3469 }
3470
3471 fn make_record_resolved_decoder(
3472 reader_fields: &[(&str, DataType, bool)],
3473 writer_to_reader: Vec<Option<usize>>,
3474 skip_decoders: Vec<Option<Skipper>>,
3475 ) -> Decoder {
3476 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3477 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3478 for (name, dt, nullable) in reader_fields {
3479 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3480 let enc = match dt {
3481 DataType::Int32 => Decoder::Int32(Vec::new()),
3482 DataType::Int64 => Decoder::Int64(Vec::new()),
3483 DataType::Utf8 => {
3484 Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3485 }
3486 other => panic!("Unsupported test reader field type: {other:?}"),
3487 };
3488 encodings.push(enc);
3489 }
3490 let fields: Fields = field_refs.into();
3491 Decoder::Record(
3492 fields,
3493 encodings,
3494 Some(Projector {
3495 writer_to_reader: Arc::from(writer_to_reader),
3496 skip_decoders,
3497 field_defaults: vec![None; reader_fields.len()],
3498 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3499 }),
3500 )
3501 }
3502
3503 #[test]
3504 fn test_skip_writer_trailing_field_int32() {
3505 let mut dec = make_record_resolved_decoder(
3506 &[("id", arrow_schema::DataType::Int32, false)],
3507 vec![Some(0), None],
3508 vec![None, Some(super::Skipper::Int32)],
3509 );
3510 let mut data = Vec::new();
3511 data.extend_from_slice(&encode_avro_int(7));
3512 data.extend_from_slice(&encode_avro_int(999));
3513 let mut cur = AvroCursor::new(&data);
3514 dec.decode(&mut cur).unwrap();
3515 assert_eq!(cur.position(), data.len());
3516 let arr = dec.flush(None).unwrap();
3517 let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
3518 assert_eq!(struct_arr.len(), 1);
3519 let id = struct_arr
3520 .column_by_name("id")
3521 .unwrap()
3522 .as_any()
3523 .downcast_ref::<Int32Array>()
3524 .unwrap();
3525 assert_eq!(id.value(0), 7);
3526 }
3527
3528 #[test]
3529 fn test_skip_writer_middle_field_string() {
3530 let mut dec = make_record_resolved_decoder(
3531 &[
3532 ("id", DataType::Int32, false),
3533 ("score", DataType::Int64, false),
3534 ],
3535 vec![Some(0), None, Some(1)],
3536 vec![None, Some(Skipper::String), None],
3537 );
3538 let mut data = Vec::new();
3539 data.extend_from_slice(&encode_avro_int(42));
3540 data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
3541 data.extend_from_slice(&encode_avro_long(1000));
3542 let mut cur = AvroCursor::new(&data);
3543 dec.decode(&mut cur).unwrap();
3544 assert_eq!(cur.position(), data.len());
3545 let arr = dec.flush(None).unwrap();
3546 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3547 let id = s
3548 .column_by_name("id")
3549 .unwrap()
3550 .as_any()
3551 .downcast_ref::<Int32Array>()
3552 .unwrap();
3553 let score = s
3554 .column_by_name("score")
3555 .unwrap()
3556 .as_any()
3557 .downcast_ref::<Int64Array>()
3558 .unwrap();
3559 assert_eq!(id.value(0), 42);
3560 assert_eq!(score.value(0), 1000);
3561 }
3562
3563 #[test]
3564 fn test_skip_writer_array_with_negative_block_count_fast() {
3565 let mut dec = make_record_resolved_decoder(
3566 &[("id", DataType::Int32, false)],
3567 vec![None, Some(0)],
3568 vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
3569 );
3570 let mut array_payload = Vec::new();
3571 array_payload.extend_from_slice(&encode_avro_int(1));
3572 array_payload.extend_from_slice(&encode_avro_int(2));
3573 array_payload.extend_from_slice(&encode_avro_int(3));
3574 let mut data = Vec::new();
3575 data.extend_from_slice(&encode_avro_long(-3));
3576 data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
3577 data.extend_from_slice(&array_payload);
3578 data.extend_from_slice(&encode_avro_long(0));
3579 data.extend_from_slice(&encode_avro_int(5));
3580 let mut cur = AvroCursor::new(&data);
3581 dec.decode(&mut cur).unwrap();
3582 assert_eq!(cur.position(), data.len());
3583 let arr = dec.flush(None).unwrap();
3584 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3585 let id = s
3586 .column_by_name("id")
3587 .unwrap()
3588 .as_any()
3589 .downcast_ref::<Int32Array>()
3590 .unwrap();
3591 assert_eq!(id.len(), 1);
3592 assert_eq!(id.value(0), 5);
3593 }
3594
3595 #[test]
3596 fn test_skip_writer_map_with_negative_block_count_fast() {
3597 let mut dec = make_record_resolved_decoder(
3598 &[("id", DataType::Int32, false)],
3599 vec![None, Some(0)],
3600 vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
3601 );
3602 let mut entries = Vec::new();
3603 entries.extend_from_slice(&encode_avro_bytes(b"k1"));
3604 entries.extend_from_slice(&encode_avro_int(10));
3605 entries.extend_from_slice(&encode_avro_bytes(b"k2"));
3606 entries.extend_from_slice(&encode_avro_int(20));
3607 let mut data = Vec::new();
3608 data.extend_from_slice(&encode_avro_long(-2));
3609 data.extend_from_slice(&encode_avro_long(entries.len() as i64));
3610 data.extend_from_slice(&entries);
3611 data.extend_from_slice(&encode_avro_long(0));
3612 data.extend_from_slice(&encode_avro_int(123));
3613 let mut cur = AvroCursor::new(&data);
3614 dec.decode(&mut cur).unwrap();
3615 assert_eq!(cur.position(), data.len());
3616 let arr = dec.flush(None).unwrap();
3617 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3618 let id = s
3619 .column_by_name("id")
3620 .unwrap()
3621 .as_any()
3622 .downcast_ref::<Int32Array>()
3623 .unwrap();
3624 assert_eq!(id.len(), 1);
3625 assert_eq!(id.value(0), 123);
3626 }
3627
3628 #[test]
3629 fn test_skip_writer_nullable_field_union_nullfirst() {
3630 let mut dec = make_record_resolved_decoder(
3631 &[("id", DataType::Int32, false)],
3632 vec![None, Some(0)],
3633 vec![
3634 Some(super::Skipper::Nullable(
3635 Nullability::NullFirst,
3636 Box::new(super::Skipper::Int32),
3637 )),
3638 None,
3639 ],
3640 );
3641 let mut row1 = Vec::new();
3642 row1.extend_from_slice(&encode_avro_long(0));
3643 row1.extend_from_slice(&encode_avro_int(5));
3644 let mut row2 = Vec::new();
3645 row2.extend_from_slice(&encode_avro_long(1));
3646 row2.extend_from_slice(&encode_avro_int(123));
3647 row2.extend_from_slice(&encode_avro_int(7));
3648 let mut cur1 = AvroCursor::new(&row1);
3649 let mut cur2 = AvroCursor::new(&row2);
3650 dec.decode(&mut cur1).unwrap();
3651 dec.decode(&mut cur2).unwrap();
3652 assert_eq!(cur1.position(), row1.len());
3653 assert_eq!(cur2.position(), row2.len());
3654 let arr = dec.flush(None).unwrap();
3655 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3656 let id = s
3657 .column_by_name("id")
3658 .unwrap()
3659 .as_any()
3660 .downcast_ref::<Int32Array>()
3661 .unwrap();
3662 assert_eq!(id.len(), 2);
3663 assert_eq!(id.value(0), 5);
3664 assert_eq!(id.value(1), 7);
3665 }
3666
3667 fn make_dense_union_avro(
3668 children: Vec<(Codec, &'_ str, DataType)>,
3669 type_ids: Vec<i8>,
3670 ) -> AvroDataType {
3671 let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
3672 let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
3673 for (codec, name, dt) in children.into_iter() {
3674 avro_children.push(AvroDataType::new(codec, Default::default(), None));
3675 fields.push(arrow_schema::Field::new(name, dt, true));
3676 }
3677 let union_fields = UnionFields::new(type_ids, fields);
3678 let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
3679 AvroDataType::new(union_codec, Default::default(), None)
3680 }
3681
3682 #[test]
3683 fn test_union_dense_two_children_custom_type_ids() {
3684 let union_dt = make_dense_union_avro(
3685 vec![
3686 (Codec::Int32, "i", DataType::Int32),
3687 (Codec::Utf8, "s", DataType::Utf8),
3688 ],
3689 vec![2, 5],
3690 );
3691 let mut dec = Decoder::try_new(&union_dt).unwrap();
3692 let mut r1 = Vec::new();
3693 r1.extend_from_slice(&encode_avro_long(0));
3694 r1.extend_from_slice(&encode_avro_int(7));
3695 let mut r2 = Vec::new();
3696 r2.extend_from_slice(&encode_avro_long(1));
3697 r2.extend_from_slice(&encode_avro_bytes(b"x"));
3698 let mut r3 = Vec::new();
3699 r3.extend_from_slice(&encode_avro_long(0));
3700 r3.extend_from_slice(&encode_avro_int(-1));
3701 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3702 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3703 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3704 let array = dec.flush(None).unwrap();
3705 let ua = array
3706 .as_any()
3707 .downcast_ref::<UnionArray>()
3708 .expect("expected UnionArray");
3709 assert_eq!(ua.len(), 3);
3710 assert_eq!(ua.type_id(0), 2);
3711 assert_eq!(ua.type_id(1), 5);
3712 assert_eq!(ua.type_id(2), 2);
3713 assert_eq!(ua.value_offset(0), 0);
3714 assert_eq!(ua.value_offset(1), 0);
3715 assert_eq!(ua.value_offset(2), 1);
3716 let int_child = ua
3717 .child(2)
3718 .as_any()
3719 .downcast_ref::<Int32Array>()
3720 .expect("int child");
3721 assert_eq!(int_child.len(), 2);
3722 assert_eq!(int_child.value(0), 7);
3723 assert_eq!(int_child.value(1), -1);
3724 let str_child = ua
3725 .child(5)
3726 .as_any()
3727 .downcast_ref::<StringArray>()
3728 .expect("string child");
3729 assert_eq!(str_child.len(), 1);
3730 assert_eq!(str_child.value(0), "x");
3731 }
3732
3733 #[test]
3734 fn test_union_dense_with_null_and_string_children() {
3735 let union_dt = make_dense_union_avro(
3736 vec![
3737 (Codec::Null, "n", DataType::Null),
3738 (Codec::Utf8, "s", DataType::Utf8),
3739 ],
3740 vec![42, 7],
3741 );
3742 let mut dec = Decoder::try_new(&union_dt).unwrap();
3743 let r1 = encode_avro_long(0);
3744 let mut r2 = Vec::new();
3745 r2.extend_from_slice(&encode_avro_long(1));
3746 r2.extend_from_slice(&encode_avro_bytes(b"abc"));
3747 let r3 = encode_avro_long(0);
3748 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3749 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3750 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3751 let array = dec.flush(None).unwrap();
3752 let ua = array
3753 .as_any()
3754 .downcast_ref::<UnionArray>()
3755 .expect("expected UnionArray");
3756 assert_eq!(ua.len(), 3);
3757 assert_eq!(ua.type_id(0), 42);
3758 assert_eq!(ua.type_id(1), 7);
3759 assert_eq!(ua.type_id(2), 42);
3760 assert_eq!(ua.value_offset(0), 0);
3761 assert_eq!(ua.value_offset(1), 0);
3762 assert_eq!(ua.value_offset(2), 1);
3763 let null_child = ua
3764 .child(42)
3765 .as_any()
3766 .downcast_ref::<NullArray>()
3767 .expect("null child");
3768 assert_eq!(null_child.len(), 2);
3769 let str_child = ua
3770 .child(7)
3771 .as_any()
3772 .downcast_ref::<StringArray>()
3773 .expect("string child");
3774 assert_eq!(str_child.len(), 1);
3775 assert_eq!(str_child.value(0), "abc");
3776 }
3777
3778 #[test]
3779 fn test_union_decode_negative_branch_index_errors() {
3780 let union_dt = make_dense_union_avro(
3781 vec![
3782 (Codec::Int32, "i", DataType::Int32),
3783 (Codec::Utf8, "s", DataType::Utf8),
3784 ],
3785 vec![0, 1],
3786 );
3787 let mut dec = Decoder::try_new(&union_dt).unwrap();
3788 let row = encode_avro_long(-1); let err = dec
3790 .decode(&mut AvroCursor::new(&row))
3791 .expect_err("expected error for negative branch index");
3792 let msg = err.to_string();
3793 assert!(
3794 msg.contains("Negative union branch index"),
3795 "unexpected error message: {msg}"
3796 );
3797 }
3798
3799 #[test]
3800 fn test_union_decode_out_of_range_branch_index_errors() {
3801 let union_dt = make_dense_union_avro(
3802 vec![
3803 (Codec::Int32, "i", DataType::Int32),
3804 (Codec::Utf8, "s", DataType::Utf8),
3805 ],
3806 vec![10, 11],
3807 );
3808 let mut dec = Decoder::try_new(&union_dt).unwrap();
3809 let row = encode_avro_long(2);
3810 let err = dec
3811 .decode(&mut AvroCursor::new(&row))
3812 .expect_err("expected error for out-of-range branch index");
3813 let msg = err.to_string();
3814 assert!(
3815 msg.contains("out of range"),
3816 "unexpected error message: {msg}"
3817 );
3818 }
3819
3820 #[test]
3821 fn test_union_sparse_mode_not_supported() {
3822 let children: Vec<AvroDataType> = vec![
3823 AvroDataType::new(Codec::Int32, Default::default(), None),
3824 AvroDataType::new(Codec::Utf8, Default::default(), None),
3825 ];
3826 let uf = UnionFields::new(
3827 vec![1, 3],
3828 vec![
3829 arrow_schema::Field::new("i", DataType::Int32, true),
3830 arrow_schema::Field::new("s", DataType::Utf8, true),
3831 ],
3832 );
3833 let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
3834 let dt = AvroDataType::new(codec, Default::default(), None);
3835 let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
3836 let msg = err.to_string();
3837 assert!(
3838 msg.contains("Sparse Arrow unions are not yet supported"),
3839 "unexpected error message: {msg}"
3840 );
3841 }
3842
3843 fn make_record_decoder_with_projector_defaults(
3844 reader_fields: &[(&str, DataType, bool)],
3845 field_defaults: Vec<Option<AvroLiteral>>,
3846 default_injections: Vec<(usize, AvroLiteral)>,
3847 writer_to_reader_len: usize,
3848 ) -> Decoder {
3849 assert_eq!(
3850 field_defaults.len(),
3851 reader_fields.len(),
3852 "field_defaults must have one entry per reader field"
3853 );
3854 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3855 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3856 for (name, dt, nullable) in reader_fields {
3857 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3858 let enc = match dt {
3859 DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
3860 DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
3861 DataType::Utf8 => Decoder::String(
3862 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3863 Vec::with_capacity(DEFAULT_CAPACITY),
3864 ),
3865 other => panic!("Unsupported test field type in helper: {other:?}"),
3866 };
3867 encodings.push(enc);
3868 }
3869 let fields: Fields = field_refs.into();
3870 let skip_decoders: Vec<Option<Skipper>> =
3871 (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
3872 let projector = Projector {
3873 writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
3874 skip_decoders,
3875 field_defaults,
3876 default_injections: Arc::from(default_injections),
3877 };
3878 Decoder::Record(fields, encodings, Some(projector))
3879 }
3880
3881 #[test]
3882 fn test_default_append_int32_and_int64_from_int_and_long() {
3883 let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3884 d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
3885 let arr = d_i32.flush(None).unwrap();
3886 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3887 assert_eq!(a.len(), 1);
3888 assert_eq!(a.value(0), 42);
3889 let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
3890 d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
3891 d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
3892 let arr64 = d_i64.flush(None).unwrap();
3893 let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
3894 assert_eq!(a64.len(), 2);
3895 assert_eq!(a64.value(0), 5);
3896 assert_eq!(a64.value(1), 7);
3897 }
3898
3899 #[test]
3900 fn test_default_append_floats_and_doubles() {
3901 let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
3902 d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
3903 let arr32 = d_f32.flush(None).unwrap();
3904 let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
3905 assert_eq!(a.value(0), 1.5);
3906 let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
3907 d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
3908 let arr64 = d_f64.flush(None).unwrap();
3909 let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
3910 assert_eq!(b.value(0), 2.25);
3911 }
3912
3913 #[test]
3914 fn test_default_append_string_and_bytes() {
3915 let mut d_str = Decoder::String(
3916 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3917 Vec::with_capacity(DEFAULT_CAPACITY),
3918 );
3919 d_str
3920 .append_default(&AvroLiteral::String("hi".into()))
3921 .unwrap();
3922 let s_arr = d_str.flush(None).unwrap();
3923 let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
3924 assert_eq!(arr.value(0), "hi");
3925 let mut d_bytes = Decoder::Binary(
3926 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3927 Vec::with_capacity(DEFAULT_CAPACITY),
3928 );
3929 d_bytes
3930 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3931 .unwrap();
3932 let b_arr = d_bytes.flush(None).unwrap();
3933 let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3934 assert_eq!(barr.value(0), &[1, 2, 3]);
3935 let mut d_str_err = Decoder::String(
3936 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3937 Vec::with_capacity(DEFAULT_CAPACITY),
3938 );
3939 let err = d_str_err
3940 .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
3941 .unwrap_err();
3942 assert!(
3943 err.to_string()
3944 .contains("Default for string must be string"),
3945 "unexpected error: {err:?}"
3946 );
3947 }
3948
3949 #[test]
3950 fn test_default_append_nullable_int32_null_and_value() {
3951 let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3952 let mut dec = Decoder::Nullable(
3953 Nullability::NullFirst,
3954 NullBufferBuilder::new(DEFAULT_CAPACITY),
3955 Box::new(inner),
3956 NullablePlan::ReadTag,
3957 );
3958 dec.append_default(&AvroLiteral::Null).unwrap();
3959 dec.append_default(&AvroLiteral::Int(11)).unwrap();
3960 let arr = dec.flush(None).unwrap();
3961 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3962 assert_eq!(a.len(), 2);
3963 assert!(a.is_null(0));
3964 assert_eq!(a.value(1), 11);
3965 }
3966
3967 #[test]
3968 fn test_default_append_array_of_ints() {
3969 let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3970 let mut d = Decoder::try_new(&list_dt).unwrap();
3971 let items = vec![
3972 AvroLiteral::Int(1),
3973 AvroLiteral::Int(2),
3974 AvroLiteral::Int(3),
3975 ];
3976 d.append_default(&AvroLiteral::Array(items)).unwrap();
3977 let arr = d.flush(None).unwrap();
3978 let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
3979 assert_eq!(list.len(), 1);
3980 assert_eq!(list.value_length(0), 3);
3981 let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
3982 assert_eq!(vals.values(), &[1, 2, 3]);
3983 }
3984
3985 #[test]
3986 fn test_default_append_map_string_to_int() {
3987 let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
3988 let mut d = Decoder::try_new(&map_dt).unwrap();
3989 let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
3990 m.insert("k1".to_string(), AvroLiteral::Int(10));
3991 m.insert("k2".to_string(), AvroLiteral::Int(20));
3992 d.append_default(&AvroLiteral::Map(m)).unwrap();
3993 let arr = d.flush(None).unwrap();
3994 let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
3995 assert_eq!(map.len(), 1);
3996 assert_eq!(map.value_length(0), 2);
3997 let binding = map.value(0);
3998 let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
3999 let k = entries
4000 .column_by_name("key")
4001 .unwrap()
4002 .as_any()
4003 .downcast_ref::<StringArray>()
4004 .unwrap();
4005 let v = entries
4006 .column_by_name("value")
4007 .unwrap()
4008 .as_any()
4009 .downcast_ref::<Int32Array>()
4010 .unwrap();
4011 let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
4012 assert_eq!(keys, ["k1", "k2"].into_iter().collect());
4013 let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
4014 assert_eq!(vals, [10, 20].into_iter().collect());
4015 }
4016
4017 #[test]
4018 fn test_default_append_enum_by_symbol() {
4019 let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4020 let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4021 d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4022 let arr = d.flush(None).unwrap();
4023 let dict = arr
4024 .as_any()
4025 .downcast_ref::<DictionaryArray<Int32Type>>()
4026 .unwrap();
4027 assert_eq!(dict.len(), 1);
4028 let expected = Int32Array::from(vec![1]);
4029 assert_eq!(dict.keys(), &expected);
4030 let values = dict
4031 .values()
4032 .as_any()
4033 .downcast_ref::<StringArray>()
4034 .unwrap();
4035 assert_eq!(values.value(1), "B");
4036 }
4037
4038 #[test]
4039 fn test_default_append_uuid_and_type_error() {
4040 let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4041 let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4042 d.append_default(&AvroLiteral::String(uuid_str.into()))
4043 .unwrap();
4044 let arr_ref = d.flush(None).unwrap();
4045 let arr = arr_ref
4046 .as_any()
4047 .downcast_ref::<FixedSizeBinaryArray>()
4048 .unwrap();
4049 assert_eq!(arr.value_length(), 16);
4050 assert_eq!(arr.len(), 1);
4051 let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4052 let err = d2
4053 .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4054 .unwrap_err();
4055 assert!(
4056 err.to_string().contains("Default for uuid must be string"),
4057 "unexpected error: {err:?}"
4058 );
4059 }
4060
4061 #[test]
4062 fn test_default_append_fixed_and_length_mismatch() {
4063 let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4064 d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4065 .unwrap();
4066 let arr_ref = d.flush(None).unwrap();
4067 let arr = arr_ref
4068 .as_any()
4069 .downcast_ref::<FixedSizeBinaryArray>()
4070 .unwrap();
4071 assert_eq!(arr.value_length(), 4);
4072 assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4073 let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4074 let err = d_err
4075 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4076 .unwrap_err();
4077 assert!(
4078 err.to_string().contains("Fixed default length"),
4079 "unexpected error: {err:?}"
4080 );
4081 }
4082
4083 #[test]
4084 fn test_default_append_duration_and_length_validation() {
4085 let dt = avro_from_codec(Codec::Interval);
4086 let mut d = Decoder::try_new(&dt).unwrap();
4087 let mut bytes = Vec::with_capacity(12);
4088 bytes.extend_from_slice(&1u32.to_le_bytes());
4089 bytes.extend_from_slice(&2u32.to_le_bytes());
4090 bytes.extend_from_slice(&3u32.to_le_bytes());
4091 d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4092 let arr_ref = d.flush(None).unwrap();
4093 let arr = arr_ref
4094 .as_any()
4095 .downcast_ref::<IntervalMonthDayNanoArray>()
4096 .unwrap();
4097 assert_eq!(arr.len(), 1);
4098 let v = arr.value(0);
4099 assert_eq!(v.months, 1);
4100 assert_eq!(v.days, 2);
4101 assert_eq!(v.nanoseconds, 3_000_000);
4102 let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4103 let err = d_err
4104 .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4105 .unwrap_err();
4106 assert!(
4107 err.to_string()
4108 .contains("Duration default must be exactly 12 bytes"),
4109 "unexpected error: {err:?}"
4110 );
4111 }
4112
4113 #[test]
4114 fn test_default_append_decimal256_from_bytes() {
4115 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4116 let mut d = Decoder::try_new(&dt).unwrap();
4117 let pos: [u8; 32] = [
4118 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4119 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4120 0x00, 0x00, 0x30, 0x39,
4121 ];
4122 d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4123 let neg: [u8; 32] = [
4124 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4125 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4126 0xFF, 0xFF, 0xFF, 0x85,
4127 ];
4128 d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4129 let arr = d.flush(None).unwrap();
4130 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4131 assert_eq!(dec.len(), 2);
4132 assert_eq!(dec.value_as_string(0), "123.45");
4133 assert_eq!(dec.value_as_string(1), "-1.23");
4134 }
4135
4136 #[test]
4137 fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4138 let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4139 let mut rec = make_record_decoder_with_projector_defaults(
4140 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4141 field_defaults,
4142 vec![],
4143 0,
4144 );
4145 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4146 map.insert("a".to_string(), AvroLiteral::Int(7));
4147 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4148 let arr = rec.flush(None).unwrap();
4149 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4150 let a = s
4151 .column_by_name("a")
4152 .unwrap()
4153 .as_any()
4154 .downcast_ref::<Int32Array>()
4155 .unwrap();
4156 let b = s
4157 .column_by_name("b")
4158 .unwrap()
4159 .as_any()
4160 .downcast_ref::<StringArray>()
4161 .unwrap();
4162 assert_eq!(a.value(0), 7);
4163 assert_eq!(b.value(0), "hi");
4164 }
4165
4166 #[test]
4167 fn test_record_append_default_null_uses_projector_field_defaults() {
4168 let field_defaults = vec![
4169 Some(AvroLiteral::Int(5)),
4170 Some(AvroLiteral::String("x".into())),
4171 ];
4172 let mut rec = make_record_decoder_with_projector_defaults(
4173 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4174 field_defaults,
4175 vec![],
4176 0,
4177 );
4178 rec.append_default(&AvroLiteral::Null).unwrap();
4179 let arr = rec.flush(None).unwrap();
4180 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4181 let a = s
4182 .column_by_name("a")
4183 .unwrap()
4184 .as_any()
4185 .downcast_ref::<Int32Array>()
4186 .unwrap();
4187 let b = s
4188 .column_by_name("b")
4189 .unwrap()
4190 .as_any()
4191 .downcast_ref::<StringArray>()
4192 .unwrap();
4193 assert_eq!(a.value(0), 5);
4194 assert_eq!(b.value(0), "x");
4195 }
4196
4197 #[test]
4198 fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
4199 {
4200 let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
4201 let mut field_refs: Vec<FieldRef> = Vec::new();
4202 let mut encoders: Vec<Decoder> = Vec::new();
4203 for (name, dt, nullable) in &fields {
4204 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4205 }
4206 let enc_a = Decoder::Nullable(
4207 Nullability::NullSecond,
4208 NullBufferBuilder::new(DEFAULT_CAPACITY),
4209 Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
4210 NullablePlan::ReadTag,
4211 );
4212 let enc_b = Decoder::Nullable(
4213 Nullability::NullSecond,
4214 NullBufferBuilder::new(DEFAULT_CAPACITY),
4215 Box::new(Decoder::String(
4216 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4217 Vec::with_capacity(DEFAULT_CAPACITY),
4218 )),
4219 NullablePlan::ReadTag,
4220 );
4221 encoders.push(enc_a);
4222 encoders.push(enc_b);
4223 let projector = Projector {
4224 writer_to_reader: Arc::from(vec![]),
4225 skip_decoders: vec![],
4226 field_defaults: vec![None, None], default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4228 };
4229 let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
4230 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4231 map.insert("a".to_string(), AvroLiteral::Int(9));
4232 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4233 let arr = rec.flush(None).unwrap();
4234 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4235 let a = s
4236 .column_by_name("a")
4237 .unwrap()
4238 .as_any()
4239 .downcast_ref::<Int32Array>()
4240 .unwrap();
4241 let b = s
4242 .column_by_name("b")
4243 .unwrap()
4244 .as_any()
4245 .downcast_ref::<StringArray>()
4246 .unwrap();
4247 assert!(a.is_valid(0));
4248 assert_eq!(a.value(0), 9);
4249 assert!(b.is_null(0));
4250 }
4251
4252 #[test]
4253 fn test_projector_default_injection_when_writer_lacks_fields() {
4254 let defaults = vec![None, None];
4255 let injections = vec![
4256 (0, AvroLiteral::Int(99)),
4257 (1, AvroLiteral::String("alice".into())),
4258 ];
4259 let mut rec = make_record_decoder_with_projector_defaults(
4260 &[
4261 ("id", DataType::Int32, false),
4262 ("name", DataType::Utf8, false),
4263 ],
4264 defaults,
4265 injections,
4266 0,
4267 );
4268 rec.decode(&mut AvroCursor::new(&[])).unwrap();
4269 let arr = rec.flush(None).unwrap();
4270 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4271 let id = s
4272 .column_by_name("id")
4273 .unwrap()
4274 .as_any()
4275 .downcast_ref::<Int32Array>()
4276 .unwrap();
4277 let name = s
4278 .column_by_name("name")
4279 .unwrap()
4280 .as_any()
4281 .downcast_ref::<StringArray>()
4282 .unwrap();
4283 assert_eq!(id.value(0), 99);
4284 assert_eq!(name.value(0), "alice");
4285 }
4286
4287 #[test]
4288 fn union_type_ids_are_not_child_indexes() {
4289 let encodings: Vec<AvroDataType> =
4290 vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4291 let fields: UnionFields = [
4292 (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4293 (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4294 ]
4295 .into_iter()
4296 .collect();
4297 let dt = avro_from_codec(Codec::Union(
4298 encodings.into(),
4299 fields.clone(),
4300 UnionMode::Dense,
4301 ));
4302 let mut dec = Decoder::try_new(&dt).expect("decoder");
4303 let mut b1 = encode_avro_long(1);
4304 b1.extend(encode_avro_bytes("hi".as_bytes()));
4305 dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4306 let mut b0 = encode_avro_long(0);
4307 b0.extend(encode_avro_int(5));
4308 dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4309 let arr = dec.flush(None).expect("flush");
4310 let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4311 assert_eq!(ua.len(), 2);
4312 assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4313 assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4314 assert_eq!(ua.value_offset(0), 0);
4315 assert_eq!(ua.value_offset(1), 0);
4316 let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4317 assert_eq!(utf8_child.len(), 1);
4318 assert_eq!(utf8_child.value(0), "hi");
4319 let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
4320 assert_eq!(int_child.len(), 1);
4321 assert_eq!(int_child.value(0), 5);
4322 let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
4323 assert_eq!(type_ids, vec![42_i8, 7_i8]);
4324 }
4325
4326 #[cfg(feature = "avro_custom_types")]
4327 #[test]
4328 fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), ArrowError> {
4329 for codec in [
4330 Codec::DurationNanos,
4331 Codec::DurationMicros,
4332 Codec::DurationMillis,
4333 Codec::DurationSeconds,
4334 ] {
4335 let dt = make_avro_dt(codec.clone(), None);
4336 let s = Skipper::from_avro(&dt)?;
4337 match s {
4338 Skipper::Int64 => {}
4339 other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
4340 }
4341 }
4342 Ok(())
4343 }
4344
4345 #[cfg(feature = "avro_custom_types")]
4346 #[test]
4347 fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), ArrowError> {
4348 let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
4349 for codec in [
4350 Codec::DurationNanos,
4351 Codec::DurationMicros,
4352 Codec::DurationMillis,
4353 Codec::DurationSeconds,
4354 ] {
4355 let dt = make_avro_dt(codec.clone(), None);
4356 let mut s = Skipper::from_avro(&dt)?;
4357 for &v in &values {
4358 let bytes = encode_avro_long(v);
4359 let mut cursor = AvroCursor::new(&bytes);
4360 s.skip(&mut cursor)?;
4361 assert_eq!(
4362 cursor.position(),
4363 bytes.len(),
4364 "did not consume all bytes for {:?} value {}",
4365 codec,
4366 v
4367 );
4368 }
4369 }
4370 Ok(())
4371 }
4372
4373 #[cfg(feature = "avro_custom_types")]
4374 #[test]
4375 fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), ArrowError> {
4376 let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
4377 let mut s = Skipper::from_avro(&dt)?;
4378 match &s {
4379 Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
4380 Skipper::Int64 => {}
4381 ref other => panic!("expected inner Int64, got {:?}", other),
4382 },
4383 other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
4384 }
4385 {
4386 let buf = encode_vlq_u64(0);
4387 let mut cursor = AvroCursor::new(&buf);
4388 s.skip(&mut cursor)?;
4389 assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
4390 }
4391 {
4392 let mut buf = encode_vlq_u64(1);
4393 buf.extend(encode_avro_long(0));
4394 let mut cursor = AvroCursor::new(&buf);
4395 s.skip(&mut cursor)?;
4396 assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
4397 }
4398
4399 Ok(())
4400 }
4401
4402 #[cfg(feature = "avro_custom_types")]
4403 #[test]
4404 fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), ArrowError> {
4405 let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
4406 let mut s = Skipper::from_avro(&dt)?;
4407 match &s {
4408 Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
4409 Skipper::Int64 => {}
4410 ref other => panic!("expected inner Int64, got {:?}", other),
4411 },
4412 other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
4413 }
4414 {
4415 let buf = encode_vlq_u64(1);
4416 let mut cursor = AvroCursor::new(&buf);
4417 s.skip(&mut cursor)?;
4418 assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
4419 }
4420 {
4421 let mut buf = encode_vlq_u64(0);
4422 buf.extend(encode_avro_long(-1));
4423 let mut cursor = AvroCursor::new(&buf);
4424 s.skip(&mut cursor)?;
4425 assert_eq!(
4426 cursor.position(),
4427 1 + encode_avro_long(-1).len(),
4428 "expected to consume tag=0 + long(-1)"
4429 );
4430 }
4431 Ok(())
4432 }
4433
4434 #[test]
4435 fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), ArrowError> {
4436 let dt = make_avro_dt(Codec::Interval, None);
4437 let mut s = Skipper::from_avro(&dt)?;
4438 match s {
4439 Skipper::DurationFixed12 => {}
4440 other => panic!("expected DurationFixed12, got {:?}", other),
4441 }
4442 let payload = vec![0u8; 12];
4443 let mut cursor = AvroCursor::new(&payload);
4444 s.skip(&mut cursor)?;
4445 assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
4446 Ok(())
4447 }
4448
4449 #[cfg(feature = "avro_custom_types")]
4450 #[test]
4451 fn test_run_end_encoded_width16_int32_basic_grouping() {
4452 use arrow_array::RunArray;
4453 use std::sync::Arc;
4454 let inner = avro_from_codec(Codec::Int32);
4455 let ree = AvroDataType::new(
4456 Codec::RunEndEncoded(Arc::new(inner), 16),
4457 Default::default(),
4458 None,
4459 );
4460 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4461 for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
4462 let bytes = encode_avro_int(v);
4463 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4464 }
4465 let arr = dec.flush(None).expect("flush");
4466 let ra = arr
4467 .as_any()
4468 .downcast_ref::<RunArray<Int16Type>>()
4469 .expect("RunArray<Int16Type>");
4470 assert_eq!(ra.len(), 9);
4471 assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
4472 let vals = ra
4473 .values()
4474 .as_ref()
4475 .as_any()
4476 .downcast_ref::<Int32Array>()
4477 .expect("values Int32");
4478 assert_eq!(vals.values(), &[1, 2, 3]);
4479 }
4480
4481 #[cfg(feature = "avro_custom_types")]
4482 #[test]
4483 fn test_run_end_encoded_width32_nullable_values_group_nulls() {
4484 use arrow_array::RunArray;
4485 use std::sync::Arc;
4486 let inner = AvroDataType::new(
4487 Codec::Int32,
4488 Default::default(),
4489 Some(Nullability::NullSecond),
4490 );
4491 let ree = AvroDataType::new(
4492 Codec::RunEndEncoded(Arc::new(inner), 32),
4493 Default::default(),
4494 None,
4495 );
4496 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4497 let seq: [Option<i32>; 8] = [
4498 None,
4499 None,
4500 Some(7),
4501 Some(7),
4502 Some(7),
4503 None,
4504 Some(5),
4505 Some(5),
4506 ];
4507 for item in seq {
4508 let mut bytes = Vec::new();
4509 match item {
4510 None => bytes.extend_from_slice(&encode_vlq_u64(1)),
4511 Some(v) => {
4512 bytes.extend_from_slice(&encode_vlq_u64(0));
4513 bytes.extend_from_slice(&encode_avro_int(v));
4514 }
4515 }
4516 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4517 }
4518 let arr = dec.flush(None).expect("flush");
4519 let ra = arr
4520 .as_any()
4521 .downcast_ref::<RunArray<Int32Type>>()
4522 .expect("RunArray<Int32Type>");
4523 assert_eq!(ra.len(), 8);
4524 assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
4525 let vals = ra
4526 .values()
4527 .as_ref()
4528 .as_any()
4529 .downcast_ref::<Int32Array>()
4530 .expect("values Int32 (nullable)");
4531 assert_eq!(vals.len(), 4);
4532 assert!(vals.is_null(0));
4533 assert_eq!(vals.value(1), 7);
4534 assert!(vals.is_null(2));
4535 assert_eq!(vals.value(3), 5);
4536 }
4537
4538 #[cfg(feature = "avro_custom_types")]
4539 #[test]
4540 fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
4541 use arrow_array::RunArray;
4542 let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4543 let ree = Decoder::RunEndEncoded(
4544 8, 0,
4546 Box::new(inner_values),
4547 );
4548 let mut dec = Decoder::Nullable(
4549 Nullability::NullSecond,
4550 NullBufferBuilder::new(DEFAULT_CAPACITY),
4551 Box::new(ree),
4552 NullablePlan::FromSingle {
4553 promotion: Promotion::IntToDouble,
4554 },
4555 );
4556 for v in [1, 1, 2, 2, 2] {
4557 let bytes = encode_avro_int(v);
4558 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4559 }
4560 let arr = dec.flush(None).expect("flush");
4561 let ra = arr
4562 .as_any()
4563 .downcast_ref::<RunArray<Int64Type>>()
4564 .expect("RunArray<Int64Type>");
4565 assert_eq!(ra.len(), 5);
4566 assert_eq!(ra.run_ends().values(), &[2, 5]);
4567 let vals = ra
4568 .values()
4569 .as_ref()
4570 .as_any()
4571 .downcast_ref::<Float64Array>()
4572 .expect("values Float64");
4573 assert_eq!(vals.values(), &[1.0, 2.0]);
4574 }
4575
4576 #[cfg(feature = "avro_custom_types")]
4577 #[test]
4578 fn test_run_end_encoded_unsupported_run_end_width_errors() {
4579 use std::sync::Arc;
4580 let inner = avro_from_codec(Codec::Int32);
4581 let dt = AvroDataType::new(
4582 Codec::RunEndEncoded(Arc::new(inner), 3),
4583 Default::default(),
4584 None,
4585 );
4586 let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
4587 let msg = err.to_string();
4588 assert!(
4589 msg.contains("Unsupported run-end width")
4590 && msg.contains("16/32/64 bits or 2/4/8 bytes"),
4591 "unexpected error message: {msg}"
4592 );
4593 }
4594
4595 #[cfg(feature = "avro_custom_types")]
4596 #[test]
4597 fn test_run_end_encoded_empty_input_is_empty_runarray() {
4598 use arrow_array::RunArray;
4599 use std::sync::Arc;
4600 let inner = avro_from_codec(Codec::Utf8);
4601 let dt = AvroDataType::new(
4602 Codec::RunEndEncoded(Arc::new(inner), 4),
4603 Default::default(),
4604 None,
4605 );
4606 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4607 let arr = dec.flush(None).expect("flush");
4608 let ra = arr
4609 .as_any()
4610 .downcast_ref::<RunArray<Int32Type>>()
4611 .expect("RunArray<Int32Type>");
4612 assert_eq!(ra.len(), 0);
4613 assert_eq!(ra.run_ends().len(), 0);
4614 assert_eq!(ra.values().len(), 0);
4615 }
4616
4617 #[cfg(feature = "avro_custom_types")]
4618 #[test]
4619 fn test_run_end_encoded_strings_grouping_width32_bits() {
4620 use arrow_array::RunArray;
4621 use std::sync::Arc;
4622 let inner = avro_from_codec(Codec::Utf8);
4623 let dt = AvroDataType::new(
4624 Codec::RunEndEncoded(Arc::new(inner), 32),
4625 Default::default(),
4626 None,
4627 );
4628 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4629 for s in ["a", "a", "bb", "bb", "bb", "a"] {
4630 let bytes = encode_avro_bytes(s.as_bytes());
4631 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4632 }
4633 let arr = dec.flush(None).expect("flush");
4634 let ra = arr
4635 .as_any()
4636 .downcast_ref::<RunArray<Int32Type>>()
4637 .expect("RunArray<Int32Type>");
4638 assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
4639 let vals = ra
4640 .values()
4641 .as_ref()
4642 .as_any()
4643 .downcast_ref::<StringArray>()
4644 .expect("values String");
4645 assert_eq!(vals.len(), 3);
4646 assert_eq!(vals.value(0), "a");
4647 assert_eq!(vals.value(1), "bb");
4648 assert_eq!(vals.value(2), "a");
4649 }
4650
4651 #[cfg(not(feature = "avro_custom_types"))]
4652 #[test]
4653 fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
4654 let dt = avro_from_codec(Codec::Int32);
4655 let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
4656 for v in [1, 2, 3] {
4657 let bytes = encode_avro_int(v);
4658 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4659 }
4660 let arr = dec.flush(None).expect("flush");
4661 let a = arr
4662 .as_any()
4663 .downcast_ref::<Int32Array>()
4664 .expect("Int32Array");
4665 assert_eq!(a.values(), &[1, 2, 3]);
4666 }
4667
4668 #[test]
4669 fn test_timestamp_nanos_decoding_utc() {
4670 let avro_type = avro_from_codec(Codec::TimestampNanos(true));
4671 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4672 let mut data = Vec::new();
4673 for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
4674 data.extend_from_slice(&encode_avro_long(v));
4675 }
4676 let mut cur = AvroCursor::new(&data);
4677 for _ in 0..4 {
4678 decoder.decode(&mut cur).expect("decode nanos ts");
4679 }
4680 let array = decoder.flush(None).expect("flush nanos ts");
4681 let ts = array
4682 .as_any()
4683 .downcast_ref::<TimestampNanosecondArray>()
4684 .expect("TimestampNanosecondArray");
4685 assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
4686 match ts.data_type() {
4687 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4688 assert_eq!(tz.as_deref(), Some("+00:00"));
4689 }
4690 other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
4691 }
4692 }
4693
4694 #[test]
4695 fn test_timestamp_nanos_decoding_local() {
4696 let avro_type = avro_from_codec(Codec::TimestampNanos(false));
4697 let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
4698 let mut data = Vec::new();
4699 for v in [10_i64, 20_i64, -30_i64] {
4700 data.extend_from_slice(&encode_avro_long(v));
4701 }
4702 let mut cur = AvroCursor::new(&data);
4703 for _ in 0..3 {
4704 decoder.decode(&mut cur).expect("decode nanos ts");
4705 }
4706 let array = decoder.flush(None).expect("flush nanos ts");
4707 let ts = array
4708 .as_any()
4709 .downcast_ref::<TimestampNanosecondArray>()
4710 .expect("TimestampNanosecondArray");
4711 assert_eq!(ts.values(), &[10, 20, -30]);
4712 match ts.data_type() {
4713 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4714 assert_eq!(tz.as_deref(), None);
4715 }
4716 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4717 }
4718 }
4719
4720 #[test]
4721 fn test_timestamp_nanos_decoding_with_nulls() {
4722 let avro_type = AvroDataType::new(
4723 Codec::TimestampNanos(false),
4724 Default::default(),
4725 Some(Nullability::NullFirst),
4726 );
4727 let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
4728 let mut data = Vec::new();
4729 data.extend_from_slice(&encode_avro_long(1));
4730 data.extend_from_slice(&encode_avro_long(42));
4731 data.extend_from_slice(&encode_avro_long(0));
4732 data.extend_from_slice(&encode_avro_long(1));
4733 data.extend_from_slice(&encode_avro_long(-7));
4734 let mut cur = AvroCursor::new(&data);
4735 for _ in 0..3 {
4736 decoder.decode(&mut cur).expect("decode nullable nanos ts");
4737 }
4738 let array = decoder.flush(None).expect("flush nullable nanos ts");
4739 let ts = array
4740 .as_any()
4741 .downcast_ref::<TimestampNanosecondArray>()
4742 .expect("TimestampNanosecondArray");
4743 assert_eq!(ts.len(), 3);
4744 assert!(ts.is_valid(0));
4745 assert!(ts.is_null(1));
4746 assert!(ts.is_valid(2));
4747 assert_eq!(ts.value(0), 42);
4748 assert_eq!(ts.value(2), -7);
4749 match ts.data_type() {
4750 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
4751 assert_eq!(tz.as_deref(), None);
4752 }
4753 other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
4754 }
4755 }
4756}