1use crate::codec::{
21 AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo, ResolvedRecord,
22 ResolvedUnion,
23};
24use crate::reader::cursor::AvroCursor;
25use crate::schema::Nullability;
26#[cfg(feature = "small_decimals")]
27use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
28use arrow_array::builder::{Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder};
29use arrow_array::types::*;
30use arrow_array::*;
31use arrow_buffer::*;
32use arrow_schema::{
33 ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as ArrowField,
34 FieldRef, Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
35};
36#[cfg(feature = "small_decimals")]
37use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
38#[cfg(feature = "avro_custom_types")]
39use arrow_select::take::{TakeOptions, take};
40use std::cmp::Ordering;
41use std::sync::Arc;
42use strum_macros::AsRefStr;
43use uuid::Uuid;
44
45const DEFAULT_CAPACITY: usize = 1024;
46
47#[derive(Clone, Copy, Debug)]
49enum NullablePlan {
50 ReadTag,
52 FromSingle { promotion: Promotion },
55}
56
57macro_rules! decode_decimal {
59 ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
60 let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
61 $builder.append_value(<$Int>::from_be_bytes(bytes));
62 }};
63}
64
65macro_rules! flush_decimal {
67 ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
68 let (_, vals, _) = $builder.finish().into_parts();
69 let dec = <$ArrayTy>::try_new(vals, $nulls)?
70 .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)
71 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
72 Arc::new(dec) as ArrayRef
73 }};
74}
75
76macro_rules! append_decimal_default {
79 ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
80 match $lit {
81 AvroLiteral::Bytes(b) => {
82 let ext = sign_cast_to::<$N>(b)?;
83 let val = <$Int>::from_be_bytes(ext);
84 $builder.append_value(val);
85 Ok(())
86 }
87 _ => Err(ArrowError::InvalidArgumentError(
88 concat!(
89 "Default for ",
90 $name,
91 " must be bytes (two's-complement big-endian)"
92 )
93 .to_string(),
94 )),
95 }
96 }};
97}
98
99#[derive(Debug)]
101pub(crate) struct RecordDecoder {
102 schema: SchemaRef,
103 fields: Vec<Decoder>,
104 projector: Option<Projector>,
105}
106
107impl RecordDecoder {
108 pub(crate) fn try_new_with_options(data_type: &AvroDataType) -> Result<Self, ArrowError> {
119 match data_type.codec() {
120 Codec::Struct(reader_fields) => {
121 let mut arrow_fields = Vec::with_capacity(reader_fields.len());
123 let mut encodings = Vec::with_capacity(reader_fields.len());
124 for avro_field in reader_fields.iter() {
125 arrow_fields.push(avro_field.field());
126 encodings.push(Decoder::try_new(avro_field.data_type())?);
127 }
128 let projector = match data_type.resolution.as_ref() {
129 Some(ResolutionInfo::Record(rec)) => {
130 Some(ProjectorBuilder::try_new(rec, reader_fields).build()?)
131 }
132 _ => None,
133 };
134 Ok(Self {
135 schema: Arc::new(ArrowSchema::new(arrow_fields)),
136 fields: encodings,
137 projector,
138 })
139 }
140 other => Err(ArrowError::ParseError(format!(
141 "Expected record got {other:?}"
142 ))),
143 }
144 }
145
146 pub(crate) fn schema(&self) -> &SchemaRef {
148 &self.schema
149 }
150
151 pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> {
153 let mut cursor = AvroCursor::new(buf);
154 match self.projector.as_mut() {
155 Some(proj) => {
156 for _ in 0..count {
157 proj.project_record(&mut cursor, &mut self.fields)?;
158 }
159 }
160 None => {
161 for _ in 0..count {
162 for field in &mut self.fields {
163 field.decode(&mut cursor)?;
164 }
165 }
166 }
167 }
168 Ok(cursor.position())
169 }
170
171 pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
173 let arrays = self
174 .fields
175 .iter_mut()
176 .map(|x| x.flush(None))
177 .collect::<Result<Vec<_>, _>>()?;
178 RecordBatch::try_new(self.schema.clone(), arrays)
179 }
180}
181
182#[derive(Debug)]
183struct EnumResolution {
184 mapping: Arc<[i32]>,
185 default_index: i32,
186}
187
188#[derive(Debug, AsRefStr)]
189enum Decoder {
190 Null(usize),
191 Boolean(BooleanBufferBuilder),
192 Int32(Vec<i32>),
193 Int64(Vec<i64>),
194 #[cfg(feature = "avro_custom_types")]
195 DurationSecond(Vec<i64>),
196 #[cfg(feature = "avro_custom_types")]
197 DurationMillisecond(Vec<i64>),
198 #[cfg(feature = "avro_custom_types")]
199 DurationMicrosecond(Vec<i64>),
200 #[cfg(feature = "avro_custom_types")]
201 DurationNanosecond(Vec<i64>),
202 Float32(Vec<f32>),
203 Float64(Vec<f64>),
204 Date32(Vec<i32>),
205 TimeMillis(Vec<i32>),
206 TimeMicros(Vec<i64>),
207 TimestampMillis(bool, Vec<i64>),
208 TimestampMicros(bool, Vec<i64>),
209 Int32ToInt64(Vec<i64>),
210 Int32ToFloat32(Vec<f32>),
211 Int32ToFloat64(Vec<f64>),
212 Int64ToFloat32(Vec<f32>),
213 Int64ToFloat64(Vec<f64>),
214 Float32ToFloat64(Vec<f64>),
215 BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
216 StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
217 Binary(OffsetBufferBuilder<i32>, Vec<u8>),
218 String(OffsetBufferBuilder<i32>, Vec<u8>),
220 StringView(OffsetBufferBuilder<i32>, Vec<u8>),
222 Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
223 Record(Fields, Vec<Decoder>, Option<Projector>),
224 Map(
225 FieldRef,
226 OffsetBufferBuilder<i32>,
227 OffsetBufferBuilder<i32>,
228 Vec<u8>,
229 Box<Decoder>,
230 ),
231 Fixed(i32, Vec<u8>),
232 Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
233 Duration(IntervalMonthDayNanoBuilder),
234 Uuid(Vec<u8>),
235 #[cfg(feature = "small_decimals")]
236 Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
237 #[cfg(feature = "small_decimals")]
238 Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
239 Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
240 Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
241 #[cfg(feature = "avro_custom_types")]
242 RunEndEncoded(u8, usize, Box<Decoder>),
243 Union(UnionDecoder),
244 Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
245}
246
247impl Decoder {
248 fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
249 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
250 if info.writer_is_union && !info.reader_is_union {
251 let mut clone = data_type.clone();
252 clone.resolution = None; let target = Box::new(Self::try_new_internal(&clone)?);
254 let decoder = Self::Union(
255 UnionDecoderBuilder::new()
256 .with_resolved_union(info.clone())
257 .with_target(target)
258 .build()?,
259 );
260 return Ok(decoder);
261 }
262 }
263 Self::try_new_internal(data_type)
264 }
265
266 fn try_new_internal(data_type: &AvroDataType) -> Result<Self, ArrowError> {
267 let promotion = match data_type.resolution.as_ref() {
269 Some(ResolutionInfo::Promotion(p)) => Some(p),
270 _ => None,
271 };
272 let decoder = match (data_type.codec(), promotion) {
273 (Codec::Int64, Some(Promotion::IntToLong)) => {
274 Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
275 }
276 (Codec::Float32, Some(Promotion::IntToFloat)) => {
277 Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
278 }
279 (Codec::Float64, Some(Promotion::IntToDouble)) => {
280 Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
281 }
282 (Codec::Float32, Some(Promotion::LongToFloat)) => {
283 Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
284 }
285 (Codec::Float64, Some(Promotion::LongToDouble)) => {
286 Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
287 }
288 (Codec::Float64, Some(Promotion::FloatToDouble)) => {
289 Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
290 }
291 (Codec::Utf8, Some(Promotion::BytesToString))
292 | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
293 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
294 Vec::with_capacity(DEFAULT_CAPACITY),
295 ),
296 (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
297 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
298 Vec::with_capacity(DEFAULT_CAPACITY),
299 ),
300 (Codec::Null, _) => Self::Null(0),
301 (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
302 (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
303 (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
304 (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
305 (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
306 (Codec::Binary, _) => Self::Binary(
307 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
308 Vec::with_capacity(DEFAULT_CAPACITY),
309 ),
310 (Codec::Utf8, _) => Self::String(
311 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
312 Vec::with_capacity(DEFAULT_CAPACITY),
313 ),
314 (Codec::Utf8View, _) => Self::StringView(
315 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
316 Vec::with_capacity(DEFAULT_CAPACITY),
317 ),
318 (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
319 (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
320 (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
321 (Codec::TimestampMillis(is_utc), _) => {
322 Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
323 }
324 (Codec::TimestampMicros(is_utc), _) => {
325 Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
326 }
327 #[cfg(feature = "avro_custom_types")]
328 (Codec::DurationNanos, _) => {
329 Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
330 }
331 #[cfg(feature = "avro_custom_types")]
332 (Codec::DurationMicros, _) => {
333 Self::DurationMicrosecond(Vec::with_capacity(DEFAULT_CAPACITY))
334 }
335 #[cfg(feature = "avro_custom_types")]
336 (Codec::DurationMillis, _) => {
337 Self::DurationMillisecond(Vec::with_capacity(DEFAULT_CAPACITY))
338 }
339 #[cfg(feature = "avro_custom_types")]
340 (Codec::DurationSeconds, _) => {
341 Self::DurationSecond(Vec::with_capacity(DEFAULT_CAPACITY))
342 }
343 (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
344 (Codec::Decimal(precision, scale, size), _) => {
345 let p = *precision;
346 let s = *scale;
347 let prec = p as u8;
348 let scl = s.unwrap_or(0) as i8;
349 #[cfg(feature = "small_decimals")]
350 {
351 if p <= DECIMAL32_MAX_PRECISION as usize {
352 let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
353 .with_precision_and_scale(prec, scl)?;
354 Self::Decimal32(p, s, *size, builder)
355 } else if p <= DECIMAL64_MAX_PRECISION as usize {
356 let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
357 .with_precision_and_scale(prec, scl)?;
358 Self::Decimal64(p, s, *size, builder)
359 } else if p <= DECIMAL128_MAX_PRECISION as usize {
360 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
361 .with_precision_and_scale(prec, scl)?;
362 Self::Decimal128(p, s, *size, builder)
363 } else if p <= DECIMAL256_MAX_PRECISION as usize {
364 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
365 .with_precision_and_scale(prec, scl)?;
366 Self::Decimal256(p, s, *size, builder)
367 } else {
368 return Err(ArrowError::ParseError(format!(
369 "Decimal precision {p} exceeds maximum supported"
370 )));
371 }
372 }
373 #[cfg(not(feature = "small_decimals"))]
374 {
375 if p <= DECIMAL128_MAX_PRECISION as usize {
376 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
377 .with_precision_and_scale(prec, scl)?;
378 Self::Decimal128(p, s, *size, builder)
379 } else if p <= DECIMAL256_MAX_PRECISION as usize {
380 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
381 .with_precision_and_scale(prec, scl)?;
382 Self::Decimal256(p, s, *size, builder)
383 } else {
384 return Err(ArrowError::ParseError(format!(
385 "Decimal precision {p} exceeds maximum supported"
386 )));
387 }
388 }
389 }
390 (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
391 (Codec::List(item), _) => {
392 let decoder = Self::try_new(item)?;
393 Self::Array(
394 Arc::new(item.field_with_name("item")),
395 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
396 Box::new(decoder),
397 )
398 }
399 (Codec::Enum(symbols), _) => {
400 let res = match data_type.resolution.as_ref() {
401 Some(ResolutionInfo::EnumMapping(mapping)) => Some(EnumResolution {
402 mapping: mapping.mapping.clone(),
403 default_index: mapping.default_index,
404 }),
405 _ => None,
406 };
407 Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), res)
408 }
409 (Codec::Struct(fields), _) => {
410 let mut arrow_fields = Vec::with_capacity(fields.len());
411 let mut encodings = Vec::with_capacity(fields.len());
412 for avro_field in fields.iter() {
413 let encoding = Self::try_new(avro_field.data_type())?;
414 arrow_fields.push(avro_field.field());
415 encodings.push(encoding);
416 }
417 let projector =
418 if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
419 Some(ProjectorBuilder::try_new(rec, fields).build()?)
420 } else {
421 None
422 };
423 Self::Record(arrow_fields.into(), encodings, projector)
424 }
425 (Codec::Map(child), _) => {
426 let val_field = child.field_with_name("value");
427 let map_field = Arc::new(ArrowField::new(
428 "entries",
429 DataType::Struct(Fields::from(vec![
430 ArrowField::new("key", DataType::Utf8, false),
431 val_field,
432 ])),
433 false,
434 ));
435 let val_dec = Self::try_new(child)?;
436 Self::Map(
437 map_field,
438 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
439 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
440 Vec::with_capacity(DEFAULT_CAPACITY),
441 Box::new(val_dec),
442 )
443 }
444 (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
445 (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
446 let decoders = encodings
447 .iter()
448 .map(Self::try_new_internal)
449 .collect::<Result<Vec<_>, _>>()?;
450 if fields.len() != decoders.len() {
451 return Err(ArrowError::SchemaError(format!(
452 "Union has {} fields but {} decoders",
453 fields.len(),
454 decoders.len()
455 )));
456 }
457 let branch_count = decoders.len();
460 let max_addr = (i32::MAX as usize) + 1;
461 if branch_count > max_addr {
462 return Err(ArrowError::SchemaError(format!(
463 "Union has {branch_count} branches, which exceeds the maximum addressable \
464 branches by an Avro int tag ({} + 1).",
465 i32::MAX
466 )));
467 }
468 let mut builder = UnionDecoderBuilder::new()
469 .with_fields(fields.clone())
470 .with_branches(decoders);
471 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
472 if info.reader_is_union {
473 builder = builder.with_resolved_union(info.clone());
474 }
475 }
476 Self::Union(builder.build()?)
477 }
478 (Codec::Union(_, _, _), _) => {
479 return Err(ArrowError::NotYetImplemented(
480 "Sparse Arrow unions are not yet supported".to_string(),
481 ));
482 }
483 #[cfg(feature = "avro_custom_types")]
484 (Codec::RunEndEncoded(values_dt, width_bits_or_bytes), _) => {
485 let inner = Self::try_new(values_dt)?;
486 let byte_width: u8 = match *width_bits_or_bytes {
487 2 | 4 | 8 => *width_bits_or_bytes,
488 16 => 2,
489 32 => 4,
490 64 => 8,
491 other => {
492 return Err(ArrowError::InvalidArgumentError(format!(
493 "Unsupported run-end width {other} for RunEndEncoded; \
494 expected 16/32/64 bits or 2/4/8 bytes"
495 )));
496 }
497 };
498 Self::RunEndEncoded(byte_width, 0, Box::new(inner))
499 }
500 };
501 Ok(match data_type.nullability() {
502 Some(nullability) => {
503 let mut plan = NullablePlan::ReadTag;
505 if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
506 if !info.writer_is_union && info.reader_is_union {
507 if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
508 plan = NullablePlan::FromSingle { promotion: *promo };
509 }
510 }
511 }
512 Self::Nullable(
513 nullability,
514 NullBufferBuilder::new(DEFAULT_CAPACITY),
515 Box::new(decoder),
516 plan,
517 )
518 }
519 None => decoder,
520 })
521 }
522
523 fn append_null(&mut self) -> Result<(), ArrowError> {
525 match self {
526 Self::Null(count) => *count += 1,
527 Self::Boolean(b) => b.append(false),
528 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
529 Self::Int64(v)
530 | Self::Int32ToInt64(v)
531 | Self::TimeMicros(v)
532 | Self::TimestampMillis(_, v)
533 | Self::TimestampMicros(_, v) => v.push(0),
534 #[cfg(feature = "avro_custom_types")]
535 Self::DurationSecond(v)
536 | Self::DurationMillisecond(v)
537 | Self::DurationMicrosecond(v)
538 | Self::DurationNanosecond(v) => v.push(0),
539 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
540 Self::Float64(v)
541 | Self::Int32ToFloat64(v)
542 | Self::Int64ToFloat64(v)
543 | Self::Float32ToFloat64(v) => v.push(0.),
544 Self::Binary(offsets, _)
545 | Self::String(offsets, _)
546 | Self::StringView(offsets, _)
547 | Self::BytesToString(offsets, _)
548 | Self::StringToBytes(offsets, _) => {
549 offsets.push_length(0);
550 }
551 Self::Uuid(v) => {
552 v.extend([0; 16]);
553 }
554 Self::Array(_, offsets, _) => {
555 offsets.push_length(0);
556 }
557 Self::Record(_, e, _) => {
558 for encoding in e.iter_mut() {
559 encoding.append_null()?;
560 }
561 }
562 Self::Map(_, _koff, moff, _, _) => {
563 moff.push_length(0);
564 }
565 Self::Fixed(sz, accum) => {
566 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
567 }
568 #[cfg(feature = "small_decimals")]
569 Self::Decimal32(_, _, _, builder) => builder.append_value(0),
570 #[cfg(feature = "small_decimals")]
571 Self::Decimal64(_, _, _, builder) => builder.append_value(0),
572 Self::Decimal128(_, _, _, builder) => builder.append_value(0),
573 Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
574 Self::Enum(indices, _, _) => indices.push(0),
575 Self::Duration(builder) => builder.append_null(),
576 #[cfg(feature = "avro_custom_types")]
577 Self::RunEndEncoded(_, len, inner) => {
578 *len += 1;
579 inner.append_null()?;
580 }
581 Self::Union(u) => u.append_null()?,
582 Self::Nullable(_, null_buffer, inner, _) => {
583 null_buffer.append(false);
584 inner.append_null()?;
585 }
586 }
587 Ok(())
588 }
589
590 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
592 match self {
593 Self::Nullable(_, nb, inner, _) => {
594 if matches!(lit, AvroLiteral::Null) {
595 nb.append(false);
596 inner.append_null()
597 } else {
598 nb.append(true);
599 inner.append_default(lit)
600 }
601 }
602 Self::Null(count) => match lit {
603 AvroLiteral::Null => {
604 *count += 1;
605 Ok(())
606 }
607 _ => Err(ArrowError::InvalidArgumentError(
608 "Non-null default for null type".to_string(),
609 )),
610 },
611 Self::Boolean(b) => match lit {
612 AvroLiteral::Boolean(v) => {
613 b.append(*v);
614 Ok(())
615 }
616 _ => Err(ArrowError::InvalidArgumentError(
617 "Default for boolean must be boolean".to_string(),
618 )),
619 },
620 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match lit {
621 AvroLiteral::Int(i) => {
622 v.push(*i);
623 Ok(())
624 }
625 _ => Err(ArrowError::InvalidArgumentError(
626 "Default for int32/date32/time-millis must be int".to_string(),
627 )),
628 },
629 #[cfg(feature = "avro_custom_types")]
630 Self::DurationSecond(v)
631 | Self::DurationMillisecond(v)
632 | Self::DurationMicrosecond(v)
633 | Self::DurationNanosecond(v) => match lit {
634 AvroLiteral::Long(i) => {
635 v.push(*i);
636 Ok(())
637 }
638 _ => Err(ArrowError::InvalidArgumentError(
639 "Default for duration long must be long".to_string(),
640 )),
641 },
642 Self::Int64(v)
643 | Self::Int32ToInt64(v)
644 | Self::TimeMicros(v)
645 | Self::TimestampMillis(_, v)
646 | Self::TimestampMicros(_, v) => match lit {
647 AvroLiteral::Long(i) => {
648 v.push(*i);
649 Ok(())
650 }
651 AvroLiteral::Int(i) => {
652 v.push(*i as i64);
653 Ok(())
654 }
655 _ => Err(ArrowError::InvalidArgumentError(
656 "Default for long/time-micros/timestamp must be long or int".to_string(),
657 )),
658 },
659 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => match lit {
660 AvroLiteral::Float(f) => {
661 v.push(*f);
662 Ok(())
663 }
664 _ => Err(ArrowError::InvalidArgumentError(
665 "Default for float must be float".to_string(),
666 )),
667 },
668 Self::Float64(v)
669 | Self::Int32ToFloat64(v)
670 | Self::Int64ToFloat64(v)
671 | Self::Float32ToFloat64(v) => match lit {
672 AvroLiteral::Double(f) => {
673 v.push(*f);
674 Ok(())
675 }
676 _ => Err(ArrowError::InvalidArgumentError(
677 "Default for double must be double".to_string(),
678 )),
679 },
680 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => match lit {
681 AvroLiteral::Bytes(b) => {
682 offsets.push_length(b.len());
683 values.extend_from_slice(b);
684 Ok(())
685 }
686 _ => Err(ArrowError::InvalidArgumentError(
687 "Default for bytes must be bytes".to_string(),
688 )),
689 },
690 Self::BytesToString(offsets, values)
691 | Self::String(offsets, values)
692 | Self::StringView(offsets, values) => match lit {
693 AvroLiteral::String(s) => {
694 let b = s.as_bytes();
695 offsets.push_length(b.len());
696 values.extend_from_slice(b);
697 Ok(())
698 }
699 _ => Err(ArrowError::InvalidArgumentError(
700 "Default for string must be string".to_string(),
701 )),
702 },
703 Self::Uuid(values) => match lit {
704 AvroLiteral::String(s) => {
705 let uuid = Uuid::try_parse(s).map_err(|e| {
706 ArrowError::InvalidArgumentError(format!("Invalid UUID default: {s} ({e})"))
707 })?;
708 values.extend_from_slice(uuid.as_bytes());
709 Ok(())
710 }
711 _ => Err(ArrowError::InvalidArgumentError(
712 "Default for uuid must be string".to_string(),
713 )),
714 },
715 Self::Fixed(sz, accum) => match lit {
716 AvroLiteral::Bytes(b) => {
717 if b.len() != *sz as usize {
718 return Err(ArrowError::InvalidArgumentError(format!(
719 "Fixed default length {} does not match size {sz}",
720 b.len(),
721 )));
722 }
723 accum.extend_from_slice(b);
724 Ok(())
725 }
726 _ => Err(ArrowError::InvalidArgumentError(
727 "Default for fixed must be bytes".to_string(),
728 )),
729 },
730 #[cfg(feature = "small_decimals")]
731 Self::Decimal32(_, _, _, builder) => {
732 append_decimal_default!(lit, builder, 4, i32, "decimal32")
733 }
734 #[cfg(feature = "small_decimals")]
735 Self::Decimal64(_, _, _, builder) => {
736 append_decimal_default!(lit, builder, 8, i64, "decimal64")
737 }
738 Self::Decimal128(_, _, _, builder) => {
739 append_decimal_default!(lit, builder, 16, i128, "decimal128")
740 }
741 Self::Decimal256(_, _, _, builder) => {
742 append_decimal_default!(lit, builder, 32, i256, "decimal256")
743 }
744 Self::Duration(builder) => match lit {
745 AvroLiteral::Bytes(b) => {
746 if b.len() != 12 {
747 return Err(ArrowError::InvalidArgumentError(format!(
748 "Duration default must be exactly 12 bytes, got {}",
749 b.len()
750 )));
751 }
752 let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
753 let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
754 let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
755 let nanos = (millis as i64) * 1_000_000;
756 builder.append_value(IntervalMonthDayNano::new(
757 months as i32,
758 days as i32,
759 nanos,
760 ));
761 Ok(())
762 }
763 _ => Err(ArrowError::InvalidArgumentError(
764 "Default for duration must be 12-byte little-endian months/days/millis"
765 .to_string(),
766 )),
767 },
768 Self::Array(_, offsets, inner) => match lit {
769 AvroLiteral::Array(items) => {
770 offsets.push_length(items.len());
771 for item in items {
772 inner.append_default(item)?;
773 }
774 Ok(())
775 }
776 _ => Err(ArrowError::InvalidArgumentError(
777 "Default for array must be an array literal".to_string(),
778 )),
779 },
780 Self::Map(_, koff, moff, kdata, valdec) => match lit {
781 AvroLiteral::Map(entries) => {
782 moff.push_length(entries.len());
783 for (k, v) in entries {
784 let kb = k.as_bytes();
785 koff.push_length(kb.len());
786 kdata.extend_from_slice(kb);
787 valdec.append_default(v)?;
788 }
789 Ok(())
790 }
791 _ => Err(ArrowError::InvalidArgumentError(
792 "Default for map must be a map/object literal".to_string(),
793 )),
794 },
795 Self::Enum(indices, symbols, _) => match lit {
796 AvroLiteral::Enum(sym) => {
797 let pos = symbols.iter().position(|s| s == sym).ok_or_else(|| {
798 ArrowError::InvalidArgumentError(format!(
799 "Enum default symbol {sym:?} not in reader symbols"
800 ))
801 })?;
802 indices.push(pos as i32);
803 Ok(())
804 }
805 _ => Err(ArrowError::InvalidArgumentError(
806 "Default for enum must be a symbol".to_string(),
807 )),
808 },
809 #[cfg(feature = "avro_custom_types")]
810 Self::RunEndEncoded(_, len, inner) => {
811 *len += 1;
812 inner.append_default(lit)
813 }
814 Self::Union(u) => u.append_default(lit),
815 Self::Record(field_meta, decoders, projector) => match lit {
816 AvroLiteral::Map(entries) => {
817 for (i, dec) in decoders.iter_mut().enumerate() {
818 let name = field_meta[i].name();
819 if let Some(sub) = entries.get(name) {
820 dec.append_default(sub)?;
821 } else if let Some(proj) = projector.as_ref() {
822 proj.project_default(dec, i)?;
823 } else {
824 dec.append_null()?;
825 }
826 }
827 Ok(())
828 }
829 AvroLiteral::Null => {
830 for (i, dec) in decoders.iter_mut().enumerate() {
831 if let Some(proj) = projector.as_ref() {
832 proj.project_default(dec, i)?;
833 } else {
834 dec.append_null()?;
835 }
836 }
837 Ok(())
838 }
839 _ => Err(ArrowError::InvalidArgumentError(
840 "Default for record must be a map/object or null".to_string(),
841 )),
842 },
843 }
844 }
845
846 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
848 match self {
849 Self::Null(x) => *x += 1,
850 Self::Boolean(values) => values.append(buf.get_bool()?),
851 Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
852 values.push(buf.get_int()?)
853 }
854 Self::Int64(values)
855 | Self::TimeMicros(values)
856 | Self::TimestampMillis(_, values)
857 | Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
858 #[cfg(feature = "avro_custom_types")]
859 Self::DurationSecond(values)
860 | Self::DurationMillisecond(values)
861 | Self::DurationMicrosecond(values)
862 | Self::DurationNanosecond(values) => values.push(buf.get_long()?),
863 Self::Float32(values) => values.push(buf.get_float()?),
864 Self::Float64(values) => values.push(buf.get_double()?),
865 Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
866 Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
867 Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
868 Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
869 Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
870 Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
871 Self::StringToBytes(offsets, values)
872 | Self::BytesToString(offsets, values)
873 | Self::Binary(offsets, values)
874 | Self::String(offsets, values)
875 | Self::StringView(offsets, values) => {
876 let data = buf.get_bytes()?;
877 offsets.push_length(data.len());
878 values.extend_from_slice(data);
879 }
880 Self::Uuid(values) => {
881 let s_bytes = buf.get_bytes()?;
882 let s = std::str::from_utf8(s_bytes).map_err(|e| {
883 ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
884 })?;
885 let uuid = Uuid::try_parse(s)
886 .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?;
887 values.extend_from_slice(uuid.as_bytes());
888 }
889 Self::Array(_, off, encoding) => {
890 let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
891 off.push_length(total_items);
892 }
893 Self::Record(_, encodings, None) => {
894 for encoding in encodings {
895 encoding.decode(buf)?;
896 }
897 }
898 Self::Record(_, encodings, Some(proj)) => {
899 proj.project_record(buf, encodings)?;
900 }
901 Self::Map(_, koff, moff, kdata, valdec) => {
902 let newly_added = read_blocks(buf, |cur| {
903 let kb = cur.get_bytes()?;
904 koff.push_length(kb.len());
905 kdata.extend_from_slice(kb);
906 valdec.decode(cur)
907 })?;
908 moff.push_length(newly_added);
909 }
910 Self::Fixed(sz, accum) => {
911 let fx = buf.get_fixed(*sz as usize)?;
912 accum.extend_from_slice(fx);
913 }
914 #[cfg(feature = "small_decimals")]
915 Self::Decimal32(_, _, size, builder) => {
916 decode_decimal!(size, buf, builder, 4, i32);
917 }
918 #[cfg(feature = "small_decimals")]
919 Self::Decimal64(_, _, size, builder) => {
920 decode_decimal!(size, buf, builder, 8, i64);
921 }
922 Self::Decimal128(_, _, size, builder) => {
923 decode_decimal!(size, buf, builder, 16, i128);
924 }
925 Self::Decimal256(_, _, size, builder) => {
926 decode_decimal!(size, buf, builder, 32, i256);
927 }
928 Self::Enum(indices, _, None) => {
929 indices.push(buf.get_int()?);
930 }
931 Self::Enum(indices, _, Some(res)) => {
932 let raw = buf.get_int()?;
933 let resolved = usize::try_from(raw)
934 .ok()
935 .and_then(|idx| res.mapping.get(idx).copied())
936 .filter(|&idx| idx >= 0)
937 .unwrap_or(res.default_index);
938 if resolved >= 0 {
939 indices.push(resolved);
940 } else {
941 return Err(ArrowError::ParseError(format!(
942 "Enum symbol index {raw} not resolvable and no default provided",
943 )));
944 }
945 }
946 Self::Duration(builder) => {
947 let b = buf.get_fixed(12)?;
948 let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
949 let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
950 let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
951 let nanos = (millis as i64) * 1_000_000;
952 builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
953 }
954 #[cfg(feature = "avro_custom_types")]
955 Self::RunEndEncoded(_, len, inner) => {
956 *len += 1;
957 inner.decode(buf)?;
958 }
959 Self::Union(u) => u.decode(buf)?,
960 Self::Nullable(order, nb, encoding, plan) => {
961 match *plan {
962 NullablePlan::FromSingle { promotion } => {
963 encoding.decode_with_promotion(buf, promotion)?;
964 nb.append(true);
965 }
966 NullablePlan::ReadTag => {
967 let branch = buf.read_vlq()?;
968 let is_not_null = match *order {
969 Nullability::NullFirst => branch != 0,
970 Nullability::NullSecond => branch == 0,
971 };
972 if is_not_null {
973 encoding.decode(buf)?;
975 } else {
976 encoding.append_null()?;
977 }
978 nb.append(is_not_null);
979 }
980 }
981 }
982 }
983 Ok(())
984 }
985
986 fn decode_with_promotion(
987 &mut self,
988 buf: &mut AvroCursor<'_>,
989 promotion: Promotion,
990 ) -> Result<(), ArrowError> {
991 #[cfg(feature = "avro_custom_types")]
992 if let Self::RunEndEncoded(_, len, inner) = self {
993 *len += 1;
994 return inner.decode_with_promotion(buf, promotion);
995 }
996
997 macro_rules! promote_numeric_to {
998 ($variant:ident, $getter:ident, $to:ty) => {{
999 match self {
1000 Self::$variant(v) => {
1001 let x = buf.$getter()?;
1002 v.push(x as $to);
1003 Ok(())
1004 }
1005 other => Err(ArrowError::ParseError(format!(
1006 "Promotion {promotion} target mismatch: expected {}, got {}",
1007 stringify!($variant),
1008 <Self as ::std::convert::AsRef<str>>::as_ref(other)
1009 ))),
1010 }
1011 }};
1012 }
1013 match promotion {
1014 Promotion::Direct => self.decode(buf),
1015 Promotion::IntToLong => promote_numeric_to!(Int64, get_int, i64),
1016 Promotion::IntToFloat => promote_numeric_to!(Float32, get_int, f32),
1017 Promotion::IntToDouble => promote_numeric_to!(Float64, get_int, f64),
1018 Promotion::LongToFloat => promote_numeric_to!(Float32, get_long, f32),
1019 Promotion::LongToDouble => promote_numeric_to!(Float64, get_long, f64),
1020 Promotion::FloatToDouble => promote_numeric_to!(Float64, get_float, f64),
1021 Promotion::StringToBytes => match self {
1022 Self::Binary(offsets, values) | Self::StringToBytes(offsets, values) => {
1023 let data = buf.get_bytes()?;
1024 offsets.push_length(data.len());
1025 values.extend_from_slice(data);
1026 Ok(())
1027 }
1028 other => Err(ArrowError::ParseError(format!(
1029 "Promotion {promotion} target mismatch: expected bytes (Binary/StringToBytes), got {}",
1030 <Self as AsRef<str>>::as_ref(other)
1031 ))),
1032 },
1033 Promotion::BytesToString => match self {
1034 Self::String(offsets, values)
1035 | Self::StringView(offsets, values)
1036 | Self::BytesToString(offsets, values) => {
1037 let data = buf.get_bytes()?;
1038 offsets.push_length(data.len());
1039 values.extend_from_slice(data);
1040 Ok(())
1041 }
1042 other => Err(ArrowError::ParseError(format!(
1043 "Promotion {promotion} target mismatch: expected string (String/StringView/BytesToString), got {}",
1044 <Self as AsRef<str>>::as_ref(other)
1045 ))),
1046 },
1047 }
1048 }
1049
1050 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1052 Ok(match self {
1053 Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
1054 Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
1055 Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
1056 Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
1057 Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
1058 Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1059 Self::TimeMillis(values) => {
1060 Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
1061 }
1062 Self::TimeMicros(values) => {
1063 Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
1064 }
1065 Self::TimestampMillis(is_utc, values) => Arc::new(
1066 flush_primitive::<TimestampMillisecondType>(values, nulls)
1067 .with_timezone_opt(is_utc.then(|| "+00:00")),
1068 ),
1069 Self::TimestampMicros(is_utc, values) => Arc::new(
1070 flush_primitive::<TimestampMicrosecondType>(values, nulls)
1071 .with_timezone_opt(is_utc.then(|| "+00:00")),
1072 ),
1073 #[cfg(feature = "avro_custom_types")]
1074 Self::DurationSecond(values) => {
1075 Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
1076 }
1077 #[cfg(feature = "avro_custom_types")]
1078 Self::DurationMillisecond(values) => {
1079 Arc::new(flush_primitive::<DurationMillisecondType>(values, nulls))
1080 }
1081 #[cfg(feature = "avro_custom_types")]
1082 Self::DurationMicrosecond(values) => {
1083 Arc::new(flush_primitive::<DurationMicrosecondType>(values, nulls))
1084 }
1085 #[cfg(feature = "avro_custom_types")]
1086 Self::DurationNanosecond(values) => {
1087 Arc::new(flush_primitive::<DurationNanosecondType>(values, nulls))
1088 }
1089 Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
1090 Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
1091 Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
1092 Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
1093 Arc::new(flush_primitive::<Float32Type>(values, nulls))
1094 }
1095 Self::Int32ToFloat64(values)
1096 | Self::Int64ToFloat64(values)
1097 | Self::Float32ToFloat64(values) => {
1098 Arc::new(flush_primitive::<Float64Type>(values, nulls))
1099 }
1100 Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
1101 let offsets = flush_offsets(offsets);
1102 let values = flush_values(values).into();
1103 Arc::new(BinaryArray::try_new(offsets, values, nulls)?)
1104 }
1105 Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
1106 let offsets = flush_offsets(offsets);
1107 let values = flush_values(values).into();
1108 Arc::new(StringArray::try_new(offsets, values, nulls)?)
1109 }
1110 Self::StringView(offsets, values) => {
1111 let offsets = flush_offsets(offsets);
1112 let values = flush_values(values);
1113 let array = StringArray::try_new(offsets, values.into(), nulls.clone())?;
1114 let values: Vec<&str> = (0..array.len())
1115 .map(|i| {
1116 if array.is_valid(i) {
1117 array.value(i)
1118 } else {
1119 ""
1120 }
1121 })
1122 .collect();
1123 Arc::new(StringViewArray::from(values))
1124 }
1125 Self::Array(field, offsets, values) => {
1126 let values = values.flush(None)?;
1127 let offsets = flush_offsets(offsets);
1128 Arc::new(ListArray::try_new(field.clone(), offsets, values, nulls)?)
1129 }
1130 Self::Record(fields, encodings, _) => {
1131 let arrays = encodings
1132 .iter_mut()
1133 .map(|x| x.flush(None))
1134 .collect::<Result<Vec<_>, _>>()?;
1135 Arc::new(StructArray::try_new(fields.clone(), arrays, nulls)?)
1136 }
1137 Self::Map(map_field, k_off, m_off, kdata, valdec) => {
1138 let moff = flush_offsets(m_off);
1139 let koff = flush_offsets(k_off);
1140 let kd = flush_values(kdata).into();
1141 let val_arr = valdec.flush(None)?;
1142 let key_arr = StringArray::try_new(koff, kd, None)?;
1143 if key_arr.len() != val_arr.len() {
1144 return Err(ArrowError::InvalidArgumentError(format!(
1145 "Map keys length ({}) != map values length ({})",
1146 key_arr.len(),
1147 val_arr.len()
1148 )));
1149 }
1150 let final_len = moff.len() - 1;
1151 if let Some(n) = &nulls {
1152 if n.len() != final_len {
1153 return Err(ArrowError::InvalidArgumentError(format!(
1154 "Map array null buffer length {} != final map length {final_len}",
1155 n.len()
1156 )));
1157 }
1158 }
1159 let entries_fields = match map_field.data_type() {
1160 DataType::Struct(fields) => fields.clone(),
1161 other => {
1162 return Err(ArrowError::InvalidArgumentError(format!(
1163 "Map entries field must be a Struct, got {other:?}"
1164 )));
1165 }
1166 };
1167 let entries_struct =
1168 StructArray::try_new(entries_fields, vec![Arc::new(key_arr), val_arr], None)?;
1169 let map_arr =
1170 MapArray::try_new(map_field.clone(), moff, entries_struct, nulls, false)?;
1171 Arc::new(map_arr)
1172 }
1173 Self::Fixed(sz, accum) => {
1174 let b: Buffer = flush_values(accum).into();
1175 let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
1176 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1177 Arc::new(arr)
1178 }
1179 Self::Uuid(values) => {
1180 let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
1181 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1182 Arc::new(arr)
1183 }
1184 #[cfg(feature = "small_decimals")]
1185 Self::Decimal32(precision, scale, _, builder) => {
1186 flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
1187 }
1188 #[cfg(feature = "small_decimals")]
1189 Self::Decimal64(precision, scale, _, builder) => {
1190 flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
1191 }
1192 Self::Decimal128(precision, scale, _, builder) => {
1193 flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
1194 }
1195 Self::Decimal256(precision, scale, _, builder) => {
1196 flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
1197 }
1198 Self::Enum(indices, symbols, _) => flush_dict(indices, symbols, nulls)?,
1199 Self::Duration(builder) => {
1200 let (_, vals, _) = builder.finish().into_parts();
1201 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
1202 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1203 Arc::new(vals)
1204 }
1205 #[cfg(feature = "avro_custom_types")]
1206 Self::RunEndEncoded(width, len, inner) => {
1207 let values = inner.flush(nulls)?;
1208 let n = *len;
1209 let arr = values.as_ref();
1210 let mut run_starts: Vec<usize> = Vec::with_capacity(n);
1211 if n > 0 {
1212 run_starts.push(0);
1213 for i in 1..n {
1214 if !values_equal_at(arr, i - 1, i) {
1215 run_starts.push(i);
1216 }
1217 }
1218 }
1219 if n > (u32::MAX as usize) {
1220 return Err(ArrowError::InvalidArgumentError(format!(
1221 "RunEndEncoded length {n} exceeds maximum supported by UInt32 indices for take",
1222 )));
1223 }
1224 let run_count = run_starts.len();
1225 let take_idx: PrimitiveArray<UInt32Type> =
1226 run_starts.iter().map(|&s| s as u32).collect();
1227 let per_run_values = if run_count == 0 {
1228 values.slice(0, 0)
1229 } else {
1230 take(arr, &take_idx, Option::from(TakeOptions::default())).map_err(|e| {
1231 ArrowError::ParseError(format!("take() for REE values failed: {e}"))
1232 })?
1233 };
1234
1235 macro_rules! build_run_array {
1236 ($Native:ty, $ArrowTy:ty) => {{
1237 let mut ends: Vec<$Native> = Vec::with_capacity(run_count);
1238 for (idx, &_start) in run_starts.iter().enumerate() {
1239 let end = if idx + 1 < run_count {
1240 run_starts[idx + 1]
1241 } else {
1242 n
1243 };
1244 ends.push(end as $Native);
1245 }
1246 let ends: PrimitiveArray<$ArrowTy> = ends.into_iter().collect();
1247 let run_arr = RunArray::<$ArrowTy>::try_new(&ends, per_run_values.as_ref())
1248 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1249 Arc::new(run_arr) as ArrayRef
1250 }};
1251 }
1252 match *width {
1253 2 => {
1254 if n > i16::MAX as usize {
1255 return Err(ArrowError::InvalidArgumentError(format!(
1256 "RunEndEncoded length {n} exceeds i16::MAX for run end width 2"
1257 )));
1258 }
1259 build_run_array!(i16, Int16Type)
1260 }
1261 4 => build_run_array!(i32, Int32Type),
1262 8 => build_run_array!(i64, Int64Type),
1263 other => {
1264 return Err(ArrowError::InvalidArgumentError(format!(
1265 "Unsupported run-end width {other} for RunEndEncoded"
1266 )));
1267 }
1268 }
1269 }
1270 Self::Union(u) => u.flush(nulls)?,
1271 })
1272 }
1273}
1274
1275#[derive(Debug)]
1277struct DispatchLookupTable {
1278 to_reader: Box<[i8]>,
1294 promotion: Box<[Promotion]>,
1299}
1300
1301const NO_SOURCE: i8 = -1;
1304
1305impl DispatchLookupTable {
1306 fn from_writer_to_reader(
1307 promotion_map: &[Option<(usize, Promotion)>],
1308 ) -> Result<Self, ArrowError> {
1309 let mut to_reader = Vec::with_capacity(promotion_map.len());
1310 let mut promotion = Vec::with_capacity(promotion_map.len());
1311 for map in promotion_map {
1312 match *map {
1313 Some((idx, promo)) => {
1314 let idx_i8 = i8::try_from(idx).map_err(|_| {
1315 ArrowError::SchemaError(format!(
1316 "Reader branch index {idx} exceeds i8 range (max {})",
1317 i8::MAX
1318 ))
1319 })?;
1320 to_reader.push(idx_i8);
1321 promotion.push(promo);
1322 }
1323 None => {
1324 to_reader.push(NO_SOURCE);
1325 promotion.push(Promotion::Direct);
1326 }
1327 }
1328 }
1329 Ok(Self {
1330 to_reader: to_reader.into_boxed_slice(),
1331 promotion: promotion.into_boxed_slice(),
1332 })
1333 }
1334
1335 #[inline]
1337 fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
1338 let reader_index = *self.to_reader.get(writer_index)?;
1339 (reader_index >= 0).then(|| (reader_index as usize, self.promotion[writer_index]))
1340 }
1341}
1342
1343#[derive(Debug)]
1344struct UnionDecoder {
1345 fields: UnionFields,
1346 type_ids: Vec<i8>,
1347 offsets: Vec<i32>,
1348 branches: Vec<Decoder>,
1349 counts: Vec<i32>,
1350 reader_type_codes: Vec<i8>,
1351 default_emit_idx: usize,
1352 null_emit_idx: usize,
1353 plan: UnionReadPlan,
1354}
1355
1356impl Default for UnionDecoder {
1357 fn default() -> Self {
1358 Self {
1359 fields: UnionFields::empty(),
1360 type_ids: Vec::new(),
1361 offsets: Vec::new(),
1362 branches: Vec::new(),
1363 counts: Vec::new(),
1364 reader_type_codes: Vec::new(),
1365 default_emit_idx: 0,
1366 null_emit_idx: 0,
1367 plan: UnionReadPlan::Passthrough,
1368 }
1369 }
1370}
1371
1372#[derive(Debug)]
1373enum UnionReadPlan {
1374 ReaderUnion {
1375 lookup_table: DispatchLookupTable,
1376 },
1377 FromSingle {
1378 reader_idx: usize,
1379 promotion: Promotion,
1380 },
1381 ToSingle {
1382 target: Box<Decoder>,
1383 lookup_table: DispatchLookupTable,
1384 },
1385 Passthrough,
1386}
1387
1388impl UnionDecoder {
1389 fn try_new(
1390 fields: UnionFields,
1391 branches: Vec<Decoder>,
1392 resolved: Option<ResolvedUnion>,
1393 ) -> Result<Self, ArrowError> {
1394 let reader_type_codes = fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>();
1395 let null_branch = branches.iter().position(|b| matches!(b, Decoder::Null(_)));
1396 let default_emit_idx = 0;
1397 let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
1398 let branch_len = branches.len().max(reader_type_codes.len());
1399 let max_addr = (i32::MAX as usize) + 1;
1401 if branches.len() > max_addr {
1402 return Err(ArrowError::SchemaError(format!(
1403 "Reader union has {} branches, which exceeds the maximum addressable \
1404 branches by an Avro int tag ({} + 1).",
1405 branches.len(),
1406 i32::MAX
1407 )));
1408 }
1409 Ok(Self {
1410 fields,
1411 type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
1412 offsets: Vec::with_capacity(DEFAULT_CAPACITY),
1413 branches,
1414 counts: vec![0; branch_len],
1415 reader_type_codes,
1416 default_emit_idx,
1417 null_emit_idx,
1418 plan: Self::plan_from_resolved(resolved)?,
1419 })
1420 }
1421
1422 fn try_new_from_writer_union(
1423 info: ResolvedUnion,
1424 target: Box<Decoder>,
1425 ) -> Result<Self, ArrowError> {
1426 debug_assert!(info.writer_is_union && !info.reader_is_union);
1428 let lookup_table = DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1429 Ok(Self {
1430 plan: UnionReadPlan::ToSingle {
1431 target,
1432 lookup_table,
1433 },
1434 ..Self::default()
1435 })
1436 }
1437
1438 fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> Result<UnionReadPlan, ArrowError> {
1439 let Some(info) = resolved else {
1440 return Ok(UnionReadPlan::Passthrough);
1441 };
1442 match (info.writer_is_union, info.reader_is_union) {
1443 (true, true) => {
1444 let lookup_table =
1445 DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
1446 Ok(UnionReadPlan::ReaderUnion { lookup_table })
1447 }
1448 (false, true) => {
1449 let Some(&(reader_idx, promotion)) =
1450 info.writer_to_reader.first().and_then(Option::as_ref)
1451 else {
1452 return Err(ArrowError::SchemaError(
1453 "Writer type does not match any reader union branch".to_string(),
1454 ));
1455 };
1456 Ok(UnionReadPlan::FromSingle {
1457 reader_idx,
1458 promotion,
1459 })
1460 }
1461 (true, false) => Err(ArrowError::InvalidArgumentError(
1462 "UnionDecoder::try_new cannot build writer-union to single; use UnionDecoderBuilder with a target"
1463 .to_string(),
1464 )),
1465 _ => Err(ArrowError::SchemaError(
1467 "ResolvedUnion constructed for non-union sides; resolver should return None"
1468 .to_string(),
1469 )),
1470 }
1471 }
1472
1473 #[inline]
1474 fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
1475 let raw = buf.get_long()?;
1480 if raw < 0 {
1481 return Err(ArrowError::ParseError(format!(
1482 "Negative union branch index {raw}"
1483 )));
1484 }
1485 usize::try_from(raw).map_err(|_| {
1486 ArrowError::ParseError(format!(
1487 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
1488 (usize::BITS as usize)
1489 ))
1490 })
1491 }
1492
1493 #[inline]
1494 fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, ArrowError> {
1495 let branches_len = self.branches.len();
1496 let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
1497 return Err(ArrowError::ParseError(format!(
1498 "Union branch index {reader_idx} out of range ({branches_len} branches)"
1499 )));
1500 };
1501 self.type_ids.push(self.reader_type_codes[reader_idx]);
1502 self.offsets.push(self.counts[reader_idx]);
1503 self.counts[reader_idx] += 1;
1504 Ok(reader_branch)
1505 }
1506
1507 #[inline]
1508 fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), ArrowError>
1509 where
1510 F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
1511 {
1512 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1513 return action(target);
1514 }
1515 let reader_idx = match &self.plan {
1516 UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
1517 _ => fallback_idx,
1518 };
1519 self.emit_to(reader_idx).and_then(action)
1520 }
1521
1522 fn append_null(&mut self) -> Result<(), ArrowError> {
1523 self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
1524 }
1525
1526 fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
1527 self.on_decoder(self.default_emit_idx, |decoder| decoder.append_default(lit))
1528 }
1529
1530 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1531 let (reader_idx, promotion) = match &mut self.plan {
1532 UnionReadPlan::Passthrough => (Self::read_tag(buf)?, Promotion::Direct),
1533 UnionReadPlan::ReaderUnion { lookup_table } => {
1534 let idx = Self::read_tag(buf)?;
1535 lookup_table.resolve(idx).ok_or_else(|| {
1536 ArrowError::ParseError(format!(
1537 "Union branch index {idx} not resolvable by reader schema"
1538 ))
1539 })?
1540 }
1541 UnionReadPlan::FromSingle {
1542 reader_idx,
1543 promotion,
1544 } => (*reader_idx, *promotion),
1545 UnionReadPlan::ToSingle {
1546 target,
1547 lookup_table,
1548 } => {
1549 let idx = Self::read_tag(buf)?;
1550 return match lookup_table.resolve(idx) {
1551 Some((_, promotion)) => target.decode_with_promotion(buf, promotion),
1552 None => Err(ArrowError::ParseError(format!(
1553 "Writer union branch {idx} does not resolve to reader type"
1554 ))),
1555 };
1556 }
1557 };
1558 let decoder = self.emit_to(reader_idx)?;
1559 decoder.decode_with_promotion(buf, promotion)
1560 }
1561
1562 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
1563 if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
1564 return target.flush(nulls);
1565 }
1566 debug_assert!(
1567 nulls.is_none(),
1568 "UnionArray does not accept a validity bitmap; \
1569 nulls should have been materialized as a Null child during decode"
1570 );
1571 let children = self
1572 .branches
1573 .iter_mut()
1574 .map(|d| d.flush(None))
1575 .collect::<Result<Vec<_>, _>>()?;
1576 let arr = UnionArray::try_new(
1577 self.fields.clone(),
1578 flush_values(&mut self.type_ids).into_iter().collect(),
1579 Some(flush_values(&mut self.offsets).into_iter().collect()),
1580 children,
1581 )
1582 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
1583 Ok(Arc::new(arr))
1584 }
1585}
1586
1587#[derive(Debug, Default)]
1588struct UnionDecoderBuilder {
1589 fields: Option<UnionFields>,
1590 branches: Option<Vec<Decoder>>,
1591 resolved: Option<ResolvedUnion>,
1592 target: Option<Box<Decoder>>,
1593}
1594
1595impl UnionDecoderBuilder {
1596 fn new() -> Self {
1597 Self::default()
1598 }
1599
1600 fn with_fields(mut self, fields: UnionFields) -> Self {
1601 self.fields = Some(fields);
1602 self
1603 }
1604
1605 fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
1606 self.branches = Some(branches);
1607 self
1608 }
1609
1610 fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
1611 self.resolved = Some(resolved_union);
1612 self
1613 }
1614
1615 fn with_target(mut self, target: Box<Decoder>) -> Self {
1616 self.target = Some(target);
1617 self
1618 }
1619
1620 fn build(self) -> Result<UnionDecoder, ArrowError> {
1621 match (self.resolved, self.fields, self.branches, self.target) {
1622 (resolved, Some(fields), Some(branches), None) => {
1623 UnionDecoder::try_new(fields, branches, resolved)
1624 }
1625 (Some(info), None, None, Some(target))
1626 if info.writer_is_union && !info.reader_is_union =>
1627 {
1628 UnionDecoder::try_new_from_writer_union(info, target)
1629 }
1630 _ => Err(ArrowError::InvalidArgumentError(
1631 "Invalid UnionDecoderBuilder configuration: expected either \
1632 (fields + branches + resolved) with no target for reader-unions, or \
1633 (resolved + target) with no fields/branches for writer-union to single."
1634 .to_string(),
1635 )),
1636 }
1637 }
1638}
1639
1640#[derive(Debug, Copy, Clone)]
1641enum NegativeBlockBehavior {
1642 ProcessItems,
1643 SkipBySize,
1644}
1645
1646#[inline]
1647fn skip_blocks(
1648 buf: &mut AvroCursor,
1649 mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1650) -> Result<usize, ArrowError> {
1651 process_blockwise(
1652 buf,
1653 move |c| skip_item(c),
1654 NegativeBlockBehavior::SkipBySize,
1655 )
1656}
1657
1658#[inline]
1659fn flush_dict(
1660 indices: &mut Vec<i32>,
1661 symbols: &[String],
1662 nulls: Option<NullBuffer>,
1663) -> Result<ArrayRef, ArrowError> {
1664 let keys = flush_primitive::<Int32Type>(indices, nulls);
1665 let values = Arc::new(StringArray::from_iter_values(
1666 symbols.iter().map(|s| s.as_str()),
1667 ));
1668 DictionaryArray::try_new(keys, values)
1669 .map_err(|e| ArrowError::ParseError(e.to_string()))
1670 .map(|arr| Arc::new(arr) as ArrayRef)
1671}
1672
1673#[inline]
1674fn read_blocks(
1675 buf: &mut AvroCursor,
1676 decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1677) -> Result<usize, ArrowError> {
1678 process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
1679}
1680
1681#[inline]
1682fn process_blockwise(
1683 buf: &mut AvroCursor,
1684 mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
1685 negative_behavior: NegativeBlockBehavior,
1686) -> Result<usize, ArrowError> {
1687 let mut total = 0usize;
1688 loop {
1689 let block_count = buf.get_long()?;
1694 match block_count.cmp(&0) {
1695 Ordering::Equal => break,
1696 Ordering::Less => {
1697 let count = (-block_count) as usize;
1698 let size_in_bytes = buf.get_long()? as usize;
1700 match negative_behavior {
1701 NegativeBlockBehavior::ProcessItems => {
1702 for _ in 0..count {
1704 on_item(buf)?;
1705 }
1706 }
1707 NegativeBlockBehavior::SkipBySize => {
1708 let _ = buf.get_fixed(size_in_bytes)?;
1710 }
1711 }
1712 total += count;
1713 }
1714 Ordering::Greater => {
1715 let count = block_count as usize;
1716 for _ in 0..count {
1717 on_item(buf)?;
1718 }
1719 total += count;
1720 }
1721 }
1722 }
1723 Ok(total)
1724}
1725
1726#[inline]
1727fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
1728 std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
1729}
1730
1731#[inline]
1732fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
1733 std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
1734}
1735
1736#[inline]
1737fn flush_primitive<T: ArrowPrimitiveType>(
1738 values: &mut Vec<T::Native>,
1739 nulls: Option<NullBuffer>,
1740) -> PrimitiveArray<T> {
1741 PrimitiveArray::new(flush_values(values).into(), nulls)
1742}
1743
1744#[inline]
1745fn read_decimal_bytes_be<const N: usize>(
1746 buf: &mut AvroCursor<'_>,
1747 size: &Option<usize>,
1748) -> Result<[u8; N], ArrowError> {
1749 match size {
1750 Some(n) if *n == N => {
1751 let raw = buf.get_fixed(N)?;
1752 let mut arr = [0u8; N];
1753 arr.copy_from_slice(raw);
1754 Ok(arr)
1755 }
1756 Some(n) => {
1757 let raw = buf.get_fixed(*n)?;
1758 sign_cast_to::<N>(raw)
1759 }
1760 None => {
1761 let raw = buf.get_bytes()?;
1762 sign_cast_to::<N>(raw)
1763 }
1764 }
1765}
1766
1767#[inline]
1776fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
1777 let len = raw.len();
1778 if len == N {
1780 let mut out = [0u8; N];
1781 out.copy_from_slice(raw);
1782 return Ok(out);
1783 }
1784 let first = raw.first().copied().unwrap_or(0u8);
1786 let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
1787 let mut out = [sign_byte; N];
1789 if len > N {
1790 let extra = len - N;
1793 if raw[..extra].iter().any(|&b| b != sign_byte) {
1795 return Err(ArrowError::ParseError(format!(
1796 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1797 len, N
1798 )));
1799 }
1800 if N > 0 {
1801 let first_kept = raw[extra];
1802 let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
1803 if sign_bit_mismatch {
1804 return Err(ArrowError::ParseError(format!(
1805 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
1806 len, N
1807 )));
1808 }
1809 }
1810 out.copy_from_slice(&raw[extra..]);
1811 return Ok(out);
1812 }
1813 out[N - len..].copy_from_slice(raw);
1814 Ok(out)
1815}
1816
1817#[cfg(feature = "avro_custom_types")]
1818#[inline]
1819fn values_equal_at(arr: &dyn Array, i: usize, j: usize) -> bool {
1820 match (arr.is_null(i), arr.is_null(j)) {
1821 (true, true) => true,
1822 (true, false) | (false, true) => false,
1823 (false, false) => {
1824 let a = arr.slice(i, 1);
1825 let b = arr.slice(j, 1);
1826 a == b
1827 }
1828 }
1829}
1830
1831#[derive(Debug)]
1832struct Projector {
1833 writer_to_reader: Arc<[Option<usize>]>,
1834 skip_decoders: Vec<Option<Skipper>>,
1835 field_defaults: Vec<Option<AvroLiteral>>,
1836 default_injections: Arc<[(usize, AvroLiteral)]>,
1837}
1838
1839#[derive(Debug)]
1840struct ProjectorBuilder<'a> {
1841 rec: &'a ResolvedRecord,
1842 reader_fields: Arc<[AvroField]>,
1843}
1844
1845impl<'a> ProjectorBuilder<'a> {
1846 #[inline]
1847 fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) -> Self {
1848 Self {
1849 rec,
1850 reader_fields: reader_fields.clone(),
1851 }
1852 }
1853
1854 #[inline]
1855 fn build(self) -> Result<Projector, ArrowError> {
1856 let reader_fields = self.reader_fields;
1857 let mut field_defaults: Vec<Option<AvroLiteral>> = Vec::with_capacity(reader_fields.len());
1858 for avro_field in reader_fields.as_ref() {
1859 if let Some(ResolutionInfo::DefaultValue(lit)) =
1860 avro_field.data_type().resolution.as_ref()
1861 {
1862 field_defaults.push(Some(lit.clone()));
1863 } else {
1864 field_defaults.push(None);
1865 }
1866 }
1867 let mut default_injections: Vec<(usize, AvroLiteral)> =
1868 Vec::with_capacity(self.rec.default_fields.len());
1869 for &idx in self.rec.default_fields.as_ref() {
1870 let lit = field_defaults
1871 .get(idx)
1872 .and_then(|lit| lit.clone())
1873 .unwrap_or(AvroLiteral::Null);
1874 default_injections.push((idx, lit));
1875 }
1876 let mut skip_decoders: Vec<Option<Skipper>> =
1877 Vec::with_capacity(self.rec.skip_fields.len());
1878 for datatype in self.rec.skip_fields.as_ref() {
1879 let skipper = match datatype {
1880 Some(datatype) => Some(Skipper::from_avro(datatype)?),
1881 None => None,
1882 };
1883 skip_decoders.push(skipper);
1884 }
1885 Ok(Projector {
1886 writer_to_reader: self.rec.writer_to_reader.clone(),
1887 skip_decoders,
1888 field_defaults,
1889 default_injections: default_injections.into(),
1890 })
1891 }
1892}
1893
1894impl Projector {
1895 #[inline]
1896 fn project_default(&self, decoder: &mut Decoder, index: usize) -> Result<(), ArrowError> {
1897 if let Some(default_literal) = self.field_defaults[index].as_ref() {
1904 decoder.append_default(default_literal)
1905 } else {
1906 decoder.append_null()
1907 }
1908 }
1909
1910 #[inline]
1911 fn project_record(
1912 &mut self,
1913 buf: &mut AvroCursor<'_>,
1914 encodings: &mut [Decoder],
1915 ) -> Result<(), ArrowError> {
1916 debug_assert_eq!(
1917 self.writer_to_reader.len(),
1918 self.skip_decoders.len(),
1919 "internal invariant: mapping and skipper lists must have equal length"
1920 );
1921 for (i, (mapping, skipper_opt)) in self
1922 .writer_to_reader
1923 .iter()
1924 .zip(self.skip_decoders.iter_mut())
1925 .enumerate()
1926 {
1927 match (mapping, skipper_opt.as_mut()) {
1928 (Some(reader_index), _) => encodings[*reader_index].decode(buf)?,
1929 (None, Some(skipper)) => skipper.skip(buf)?,
1930 (None, None) => {
1931 return Err(ArrowError::SchemaError(format!(
1932 "No skipper available for writer-only field at index {i}",
1933 )));
1934 }
1935 }
1936 }
1937 for (reader_index, lit) in self.default_injections.as_ref() {
1938 encodings[*reader_index].append_default(lit)?;
1939 }
1940 Ok(())
1941 }
1942}
1943
1944#[derive(Debug)]
1950enum Skipper {
1951 Null,
1952 Boolean,
1953 Int32,
1954 Int64,
1955 Float32,
1956 Float64,
1957 Bytes,
1958 String,
1959 TimeMicros,
1960 TimestampMillis,
1961 TimestampMicros,
1962 Fixed(usize),
1963 Decimal(Option<usize>),
1964 UuidString,
1965 Enum,
1966 DurationFixed12,
1967 List(Box<Skipper>),
1968 Map(Box<Skipper>),
1969 Struct(Vec<Skipper>),
1970 Union(Vec<Skipper>),
1971 Nullable(Nullability, Box<Skipper>),
1972 #[cfg(feature = "avro_custom_types")]
1973 RunEndEncoded(Box<Skipper>),
1974}
1975
1976impl Skipper {
1977 fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
1978 let mut base = match dt.codec() {
1979 Codec::Null => Self::Null,
1980 Codec::Boolean => Self::Boolean,
1981 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1982 Codec::Int64 => Self::Int64,
1983 Codec::TimeMicros => Self::TimeMicros,
1984 Codec::TimestampMillis(_) => Self::TimestampMillis,
1985 Codec::TimestampMicros(_) => Self::TimestampMicros,
1986 #[cfg(feature = "avro_custom_types")]
1987 Codec::DurationNanos
1988 | Codec::DurationMicros
1989 | Codec::DurationMillis
1990 | Codec::DurationSeconds => Self::Int64,
1991 Codec::Float32 => Self::Float32,
1992 Codec::Float64 => Self::Float64,
1993 Codec::Binary => Self::Bytes,
1994 Codec::Utf8 | Codec::Utf8View => Self::String,
1995 Codec::Fixed(sz) => Self::Fixed(*sz as usize),
1996 Codec::Decimal(_, _, size) => Self::Decimal(*size),
1997 Codec::Uuid => Self::UuidString, Codec::Enum(_) => Self::Enum,
1999 Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
2000 Codec::Struct(fields) => Self::Struct(
2001 fields
2002 .iter()
2003 .map(|f| Skipper::from_avro(f.data_type()))
2004 .collect::<Result<_, _>>()?,
2005 ),
2006 Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
2007 Codec::Interval => Self::DurationFixed12,
2008 Codec::Union(encodings, _, _) => {
2009 let max_addr = (i32::MAX as usize) + 1;
2010 if encodings.len() > max_addr {
2011 return Err(ArrowError::SchemaError(format!(
2012 "Writer union has {} branches, which exceeds the maximum addressable \
2013 branches by an Avro int tag ({} + 1).",
2014 encodings.len(),
2015 i32::MAX
2016 )));
2017 }
2018 Self::Union(
2019 encodings
2020 .iter()
2021 .map(Skipper::from_avro)
2022 .collect::<Result<_, _>>()?,
2023 )
2024 }
2025 #[cfg(feature = "avro_custom_types")]
2026 Codec::RunEndEncoded(inner, _w) => {
2027 Self::RunEndEncoded(Box::new(Skipper::from_avro(inner)?))
2028 }
2029 };
2030 if let Some(n) = dt.nullability() {
2031 base = Self::Nullable(n, Box::new(base));
2032 }
2033 Ok(base)
2034 }
2035
2036 fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
2037 match self {
2038 Self::Null => Ok(()),
2039 Self::Boolean => {
2040 buf.get_bool()?;
2041 Ok(())
2042 }
2043 Self::Int32 => {
2044 buf.get_int()?;
2045 Ok(())
2046 }
2047 Self::Int64 | Self::TimeMicros | Self::TimestampMillis | Self::TimestampMicros => {
2048 buf.get_long()?;
2049 Ok(())
2050 }
2051 Self::Float32 => {
2052 buf.get_float()?;
2053 Ok(())
2054 }
2055 Self::Float64 => {
2056 buf.get_double()?;
2057 Ok(())
2058 }
2059 Self::Bytes | Self::String | Self::UuidString => {
2060 buf.get_bytes()?;
2061 Ok(())
2062 }
2063 Self::Fixed(sz) => {
2064 buf.get_fixed(*sz)?;
2065 Ok(())
2066 }
2067 Self::Decimal(size) => {
2068 if let Some(s) = size {
2069 buf.get_fixed(*s)
2070 } else {
2071 buf.get_bytes()
2072 }?;
2073 Ok(())
2074 }
2075 Self::Enum => {
2076 buf.get_int()?;
2077 Ok(())
2078 }
2079 Self::DurationFixed12 => {
2080 buf.get_fixed(12)?;
2081 Ok(())
2082 }
2083 Self::List(item) => {
2084 skip_blocks(buf, |c| item.skip(c))?;
2085 Ok(())
2086 }
2087 Self::Map(value) => {
2088 skip_blocks(buf, |c| {
2089 c.get_bytes()?; value.skip(c)
2091 })?;
2092 Ok(())
2093 }
2094 Self::Struct(fields) => {
2095 for f in fields.iter_mut() {
2096 f.skip(buf)?
2097 }
2098 Ok(())
2099 }
2100 Self::Union(encodings) => {
2101 let raw = buf.get_long()?;
2103 if raw < 0 {
2104 return Err(ArrowError::ParseError(format!(
2105 "Negative union branch index {raw}"
2106 )));
2107 }
2108 let idx: usize = usize::try_from(raw).map_err(|_| {
2109 ArrowError::ParseError(format!(
2110 "Union branch index {raw} does not fit into usize on this platform ({}-bit)",
2111 (usize::BITS as usize)
2112 ))
2113 })?;
2114 let Some(encoding) = encodings.get_mut(idx) else {
2115 return Err(ArrowError::ParseError(format!(
2116 "Union branch index {idx} out of range for skipper ({} branches)",
2117 encodings.len()
2118 )));
2119 };
2120 encoding.skip(buf)
2121 }
2122 Self::Nullable(order, inner) => {
2123 let branch = buf.read_vlq()?;
2124 let is_not_null = match *order {
2125 Nullability::NullFirst => branch != 0,
2126 Nullability::NullSecond => branch == 0,
2127 };
2128 if is_not_null {
2129 inner.skip(buf)?;
2130 }
2131 Ok(())
2132 }
2133 #[cfg(feature = "avro_custom_types")]
2134 Self::RunEndEncoded(inner) => inner.skip(buf),
2135 }
2136 }
2137}
2138
2139#[cfg(test)]
2140mod tests {
2141 use super::*;
2142 use crate::codec::AvroFieldBuilder;
2143 use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record, Schema, TypeName};
2144 use arrow_array::cast::AsArray;
2145 use indexmap::IndexMap;
2146 use std::collections::HashMap;
2147
2148 fn encode_avro_int(value: i32) -> Vec<u8> {
2149 let mut buf = Vec::new();
2150 let mut v = (value << 1) ^ (value >> 31);
2151 while v & !0x7F != 0 {
2152 buf.push(((v & 0x7F) | 0x80) as u8);
2153 v >>= 7;
2154 }
2155 buf.push(v as u8);
2156 buf
2157 }
2158
2159 fn encode_avro_long(value: i64) -> Vec<u8> {
2160 let mut buf = Vec::new();
2161 let mut v = (value << 1) ^ (value >> 63);
2162 while v & !0x7F != 0 {
2163 buf.push(((v & 0x7F) | 0x80) as u8);
2164 v >>= 7;
2165 }
2166 buf.push(v as u8);
2167 buf
2168 }
2169
2170 fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
2171 let mut buf = encode_avro_long(bytes.len() as i64);
2172 buf.extend_from_slice(bytes);
2173 buf
2174 }
2175
2176 fn avro_from_codec(codec: Codec) -> AvroDataType {
2177 AvroDataType::new(codec, Default::default(), None)
2178 }
2179
2180 fn resolved_root_datatype(
2181 writer: Schema<'static>,
2182 reader: Schema<'static>,
2183 use_utf8view: bool,
2184 strict_mode: bool,
2185 ) -> AvroDataType {
2186 let writer_record = Schema::Complex(ComplexType::Record(Record {
2188 name: "Root",
2189 namespace: None,
2190 doc: None,
2191 aliases: vec![],
2192 fields: vec![Field {
2193 name: "v",
2194 r#type: writer,
2195 default: None,
2196 doc: None,
2197 aliases: vec![],
2198 }],
2199 attributes: Attributes::default(),
2200 }));
2201
2202 let reader_record = Schema::Complex(ComplexType::Record(Record {
2204 name: "Root",
2205 namespace: None,
2206 doc: None,
2207 aliases: vec![],
2208 fields: vec![Field {
2209 name: "v",
2210 r#type: reader,
2211 default: None,
2212 doc: None,
2213 aliases: vec![],
2214 }],
2215 attributes: Attributes::default(),
2216 }));
2217
2218 let field = AvroFieldBuilder::new(&writer_record)
2220 .with_reader_schema(&reader_record)
2221 .with_utf8view(use_utf8view)
2222 .with_strict_mode(strict_mode)
2223 .build()
2224 .expect("schema resolution should succeed");
2225
2226 match field.data_type().codec() {
2227 Codec::Struct(fields) => fields[0].data_type().clone(),
2228 other => panic!("expected wrapper struct, got {other:?}"),
2229 }
2230 }
2231
2232 fn decoder_for_promotion(
2233 writer: PrimitiveType,
2234 reader: PrimitiveType,
2235 use_utf8view: bool,
2236 ) -> Decoder {
2237 let ws = Schema::TypeName(TypeName::Primitive(writer));
2238 let rs = Schema::TypeName(TypeName::Primitive(reader));
2239 let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
2240 Decoder::try_new(&dt).unwrap()
2241 }
2242
2243 fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) -> AvroDataType {
2244 AvroDataType::new(codec, HashMap::new(), nullability)
2245 }
2246
2247 #[cfg(feature = "avro_custom_types")]
2248 fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
2249 let mut out = Vec::with_capacity(10);
2250 while x >= 0x80 {
2251 out.push((x as u8) | 0x80);
2252 x >>= 7;
2253 }
2254 out.push(x as u8);
2255 out
2256 }
2257
2258 #[test]
2259 fn test_union_resolution_writer_union_reader_union_reorder_and_promotion_dense() {
2260 let ws = Schema::Union(vec![
2261 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2262 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2263 ]);
2264 let rs = Schema::Union(vec![
2265 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2266 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2267 ]);
2268
2269 let dt = resolved_root_datatype(ws, rs, false, false);
2270 let mut dec = Decoder::try_new(&dt).unwrap();
2271
2272 let mut rec1 = encode_avro_long(0);
2273 rec1.extend(encode_avro_int(7));
2274 let mut cur1 = AvroCursor::new(&rec1);
2275 dec.decode(&mut cur1).unwrap();
2276
2277 let mut rec2 = encode_avro_long(1);
2278 rec2.extend(encode_avro_bytes("abc".as_bytes()));
2279 let mut cur2 = AvroCursor::new(&rec2);
2280 dec.decode(&mut cur2).unwrap();
2281
2282 let arr = dec.flush(None).unwrap();
2283 let ua = arr
2284 .as_any()
2285 .downcast_ref::<UnionArray>()
2286 .expect("dense union output");
2287
2288 assert_eq!(
2289 ua.type_id(0),
2290 1,
2291 "first value must select reader 'long' branch"
2292 );
2293 assert_eq!(ua.value_offset(0), 0);
2294
2295 assert_eq!(
2296 ua.type_id(1),
2297 0,
2298 "second value must select reader 'string' branch"
2299 );
2300 assert_eq!(ua.value_offset(1), 0);
2301
2302 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2303 assert_eq!(long_child.len(), 1);
2304 assert_eq!(long_child.value(0), 7);
2305
2306 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2307 assert_eq!(str_child.len(), 1);
2308 assert_eq!(str_child.value(0), "abc");
2309 }
2310
2311 #[test]
2312 fn test_union_resolution_writer_union_reader_nonunion_promotion_int_to_long() {
2313 let ws = Schema::Union(vec![
2314 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2315 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2316 ]);
2317 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2318
2319 let dt = resolved_root_datatype(ws, rs, false, false);
2320 let mut dec = Decoder::try_new(&dt).unwrap();
2321
2322 let mut data = encode_avro_long(0);
2323 data.extend(encode_avro_int(5));
2324 let mut cur = AvroCursor::new(&data);
2325 dec.decode(&mut cur).unwrap();
2326
2327 let arr = dec.flush(None).unwrap();
2328 let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2329 assert_eq!(out.len(), 1);
2330 assert_eq!(out.value(0), 5);
2331 }
2332
2333 #[test]
2334 fn test_union_resolution_writer_union_reader_nonunion_mismatch_errors() {
2335 let ws = Schema::Union(vec![
2336 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2337 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2338 ]);
2339 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
2340
2341 let dt = resolved_root_datatype(ws, rs, false, false);
2342 let mut dec = Decoder::try_new(&dt).unwrap();
2343
2344 let mut data = encode_avro_long(1);
2345 data.extend(encode_avro_bytes("z".as_bytes()));
2346 let mut cur = AvroCursor::new(&data);
2347 let res = dec.decode(&mut cur);
2348 assert!(
2349 res.is_err(),
2350 "expected error when writer union branch does not resolve to reader non-union type"
2351 );
2352 }
2353
2354 #[test]
2355 fn test_union_resolution_writer_nonunion_reader_union_selects_matching_branch() {
2356 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2357 let rs = Schema::Union(vec![
2358 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2359 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2360 ]);
2361
2362 let dt = resolved_root_datatype(ws, rs, false, false);
2363 let mut dec = Decoder::try_new(&dt).unwrap();
2364
2365 let data = encode_avro_int(6);
2366 let mut cur = AvroCursor::new(&data);
2367 dec.decode(&mut cur).unwrap();
2368
2369 let arr = dec.flush(None).unwrap();
2370 let ua = arr
2371 .as_any()
2372 .downcast_ref::<UnionArray>()
2373 .expect("dense union output");
2374 assert_eq!(ua.len(), 1);
2375 assert_eq!(
2376 ua.type_id(0),
2377 1,
2378 "must resolve to reader 'long' branch (type_id 1)"
2379 );
2380 assert_eq!(ua.value_offset(0), 0);
2381
2382 let long_child = ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
2383 assert_eq!(long_child.len(), 1);
2384 assert_eq!(long_child.value(0), 6);
2385
2386 let str_child = ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
2387 assert_eq!(str_child.len(), 0, "string branch must be empty");
2388 }
2389
2390 #[test]
2391 fn test_union_resolution_writer_union_reader_union_unmapped_branch_errors() {
2392 let ws = Schema::Union(vec![
2393 Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2394 Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
2395 ]);
2396 let rs = Schema::Union(vec![
2397 Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
2398 Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
2399 ]);
2400
2401 let dt = resolved_root_datatype(ws, rs, false, false);
2402 let mut dec = Decoder::try_new(&dt).unwrap();
2403
2404 let mut data = encode_avro_long(1);
2405 data.push(1);
2406 let mut cur = AvroCursor::new(&data);
2407 let res = dec.decode(&mut cur);
2408 assert!(
2409 res.is_err(),
2410 "expected error for unmapped writer 'boolean' branch"
2411 );
2412 }
2413
2414 #[test]
2415 fn test_schema_resolution_promotion_int_to_long() {
2416 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
2417 assert!(matches!(dec, Decoder::Int32ToInt64(_)));
2418 for v in [0, 1, -2, 123456] {
2419 let data = encode_avro_int(v);
2420 let mut cur = AvroCursor::new(&data);
2421 dec.decode(&mut cur).unwrap();
2422 }
2423 let arr = dec.flush(None).unwrap();
2424 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
2425 assert_eq!(a.value(0), 0);
2426 assert_eq!(a.value(1), 1);
2427 assert_eq!(a.value(2), -2);
2428 assert_eq!(a.value(3), 123456);
2429 }
2430
2431 #[test]
2432 fn test_schema_resolution_promotion_int_to_float() {
2433 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
2434 assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
2435 for v in [0, 42, -7] {
2436 let data = encode_avro_int(v);
2437 let mut cur = AvroCursor::new(&data);
2438 dec.decode(&mut cur).unwrap();
2439 }
2440 let arr = dec.flush(None).unwrap();
2441 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2442 assert_eq!(a.value(0), 0.0);
2443 assert_eq!(a.value(1), 42.0);
2444 assert_eq!(a.value(2), -7.0);
2445 }
2446
2447 #[test]
2448 fn test_schema_resolution_promotion_int_to_double() {
2449 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
2450 assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
2451 for v in [1, -1, 10_000] {
2452 let data = encode_avro_int(v);
2453 let mut cur = AvroCursor::new(&data);
2454 dec.decode(&mut cur).unwrap();
2455 }
2456 let arr = dec.flush(None).unwrap();
2457 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2458 assert_eq!(a.value(0), 1.0);
2459 assert_eq!(a.value(1), -1.0);
2460 assert_eq!(a.value(2), 10_000.0);
2461 }
2462
2463 #[test]
2464 fn test_schema_resolution_promotion_long_to_float() {
2465 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
2466 assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
2467 for v in [0_i64, 1_000_000_i64, -123_i64] {
2468 let data = encode_avro_long(v);
2469 let mut cur = AvroCursor::new(&data);
2470 dec.decode(&mut cur).unwrap();
2471 }
2472 let arr = dec.flush(None).unwrap();
2473 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
2474 assert_eq!(a.value(0), 0.0);
2475 assert_eq!(a.value(1), 1_000_000.0);
2476 assert_eq!(a.value(2), -123.0);
2477 }
2478
2479 #[test]
2480 fn test_schema_resolution_promotion_long_to_double() {
2481 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
2482 assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
2483 for v in [2_i64, -2_i64, 9_223_372_i64] {
2484 let data = encode_avro_long(v);
2485 let mut cur = AvroCursor::new(&data);
2486 dec.decode(&mut cur).unwrap();
2487 }
2488 let arr = dec.flush(None).unwrap();
2489 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2490 assert_eq!(a.value(0), 2.0);
2491 assert_eq!(a.value(1), -2.0);
2492 assert_eq!(a.value(2), 9_223_372.0);
2493 }
2494
2495 #[test]
2496 fn test_schema_resolution_promotion_float_to_double() {
2497 let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
2498 assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
2499 for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
2500 let data = v.to_le_bytes().to_vec();
2501 let mut cur = AvroCursor::new(&data);
2502 dec.decode(&mut cur).unwrap();
2503 }
2504 let arr = dec.flush(None).unwrap();
2505 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
2506 assert_eq!(a.value(0), 0.5_f64);
2507 assert_eq!(a.value(1), -3.25_f64);
2508 assert_eq!(a.value(2), 1.0e6_f64);
2509 }
2510
2511 #[test]
2512 fn test_schema_resolution_promotion_bytes_to_string_utf8() {
2513 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
2514 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2515 for s in ["hello", "world", "héllo"] {
2516 let data = encode_avro_bytes(s.as_bytes());
2517 let mut cur = AvroCursor::new(&data);
2518 dec.decode(&mut cur).unwrap();
2519 }
2520 let arr = dec.flush(None).unwrap();
2521 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2522 assert_eq!(a.value(0), "hello");
2523 assert_eq!(a.value(1), "world");
2524 assert_eq!(a.value(2), "héllo");
2525 }
2526
2527 #[test]
2528 fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
2529 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
2530 assert!(matches!(dec, Decoder::BytesToString(_, _)));
2531 let data = encode_avro_bytes("abc".as_bytes());
2532 let mut cur = AvroCursor::new(&data);
2533 dec.decode(&mut cur).unwrap();
2534 let arr = dec.flush(None).unwrap();
2535 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
2536 assert_eq!(a.value(0), "abc");
2537 }
2538
2539 #[test]
2540 fn test_schema_resolution_promotion_string_to_bytes() {
2541 let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
2542 assert!(matches!(dec, Decoder::StringToBytes(_, _)));
2543 for s in ["", "abc", "data"] {
2544 let data = encode_avro_bytes(s.as_bytes());
2545 let mut cur = AvroCursor::new(&data);
2546 dec.decode(&mut cur).unwrap();
2547 }
2548 let arr = dec.flush(None).unwrap();
2549 let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
2550 assert_eq!(a.value(0), b"");
2551 assert_eq!(a.value(1), b"abc");
2552 assert_eq!(a.value(2), "data".as_bytes());
2553 }
2554
2555 #[test]
2556 fn test_schema_resolution_no_promotion_passthrough_int() {
2557 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2558 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2559 let writer_record = Schema::Complex(ComplexType::Record(Record {
2561 name: "Root",
2562 namespace: None,
2563 doc: None,
2564 aliases: vec![],
2565 fields: vec![Field {
2566 name: "v",
2567 r#type: ws,
2568 default: None,
2569 doc: None,
2570 aliases: vec![],
2571 }],
2572 attributes: Attributes::default(),
2573 }));
2574 let reader_record = Schema::Complex(ComplexType::Record(Record {
2575 name: "Root",
2576 namespace: None,
2577 doc: None,
2578 aliases: vec![],
2579 fields: vec![Field {
2580 name: "v",
2581 r#type: rs,
2582 default: None,
2583 doc: None,
2584 aliases: vec![],
2585 }],
2586 attributes: Attributes::default(),
2587 }));
2588 let field = AvroFieldBuilder::new(&writer_record)
2589 .with_reader_schema(&reader_record)
2590 .with_utf8view(false)
2591 .with_strict_mode(false)
2592 .build()
2593 .unwrap();
2594 let dt = match field.data_type().codec() {
2596 Codec::Struct(fields) => fields[0].data_type().clone(),
2597 other => panic!("expected wrapper struct, got {other:?}"),
2598 };
2599 let mut dec = Decoder::try_new(&dt).unwrap();
2600 assert!(matches!(dec, Decoder::Int32(_)));
2601 for v in [7, -9] {
2602 let data = encode_avro_int(v);
2603 let mut cur = AvroCursor::new(&data);
2604 dec.decode(&mut cur).unwrap();
2605 }
2606 let arr = dec.flush(None).unwrap();
2607 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
2608 assert_eq!(a.value(0), 7);
2609 assert_eq!(a.value(1), -9);
2610 }
2611
2612 #[test]
2613 fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
2614 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
2615 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
2616 let writer_record = Schema::Complex(ComplexType::Record(Record {
2617 name: "Root",
2618 namespace: None,
2619 doc: None,
2620 aliases: vec![],
2621 fields: vec![Field {
2622 name: "v",
2623 r#type: ws,
2624 default: None,
2625 doc: None,
2626 aliases: vec![],
2627 }],
2628 attributes: Attributes::default(),
2629 }));
2630 let reader_record = Schema::Complex(ComplexType::Record(Record {
2631 name: "Root",
2632 namespace: None,
2633 doc: None,
2634 aliases: vec![],
2635 fields: vec![Field {
2636 name: "v",
2637 r#type: rs,
2638 default: None,
2639 doc: None,
2640 aliases: vec![],
2641 }],
2642 attributes: Attributes::default(),
2643 }));
2644 let res = AvroFieldBuilder::new(&writer_record)
2645 .with_reader_schema(&reader_record)
2646 .with_utf8view(false)
2647 .with_strict_mode(false)
2648 .build();
2649 assert!(res.is_err(), "expected error for illegal promotion");
2650 }
2651
2652 #[test]
2653 fn test_map_decoding_one_entry() {
2654 let value_type = avro_from_codec(Codec::Utf8);
2655 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2656 let mut decoder = Decoder::try_new(&map_type).unwrap();
2657 let mut data = Vec::new();
2659 data.extend_from_slice(&encode_avro_long(1));
2660 data.extend_from_slice(&encode_avro_bytes(b"hello")); data.extend_from_slice(&encode_avro_bytes(b"world")); data.extend_from_slice(&encode_avro_long(0));
2663 let mut cursor = AvroCursor::new(&data);
2664 decoder.decode(&mut cursor).unwrap();
2665 let array = decoder.flush(None).unwrap();
2666 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2667 assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 1);
2669 let entries = map_arr.value(0);
2670 let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
2671 assert_eq!(struct_entries.len(), 1);
2672 let key_arr = struct_entries
2673 .column_by_name("key")
2674 .unwrap()
2675 .as_any()
2676 .downcast_ref::<StringArray>()
2677 .unwrap();
2678 let val_arr = struct_entries
2679 .column_by_name("value")
2680 .unwrap()
2681 .as_any()
2682 .downcast_ref::<StringArray>()
2683 .unwrap();
2684 assert_eq!(key_arr.value(0), "hello");
2685 assert_eq!(val_arr.value(0), "world");
2686 }
2687
2688 #[test]
2689 fn test_map_decoding_empty() {
2690 let value_type = avro_from_codec(Codec::Utf8);
2691 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
2692 let mut decoder = Decoder::try_new(&map_type).unwrap();
2693 let data = encode_avro_long(0);
2694 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2695 let array = decoder.flush(None).unwrap();
2696 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
2697 assert_eq!(map_arr.len(), 1);
2698 assert_eq!(map_arr.value_length(0), 0);
2699 }
2700
2701 #[test]
2702 fn test_fixed_decoding() {
2703 let avro_type = avro_from_codec(Codec::Fixed(3));
2704 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2705
2706 let data1 = [1u8, 2, 3];
2707 let mut cursor1 = AvroCursor::new(&data1);
2708 decoder
2709 .decode(&mut cursor1)
2710 .expect("Failed to decode data1");
2711 assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
2712 let data2 = [4u8, 5, 6];
2713 let mut cursor2 = AvroCursor::new(&data2);
2714 decoder
2715 .decode(&mut cursor2)
2716 .expect("Failed to decode data2");
2717 assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
2718 let array = decoder.flush(None).expect("Failed to flush decoder");
2719 assert_eq!(array.len(), 2, "Array should contain two items");
2720 let fixed_size_binary_array = array
2721 .as_any()
2722 .downcast_ref::<FixedSizeBinaryArray>()
2723 .expect("Failed to downcast to FixedSizeBinaryArray");
2724 assert_eq!(
2725 fixed_size_binary_array.value_length(),
2726 3,
2727 "Fixed size of binary values should be 3"
2728 );
2729 assert_eq!(
2730 fixed_size_binary_array.value(0),
2731 &[1, 2, 3],
2732 "First item mismatch"
2733 );
2734 assert_eq!(
2735 fixed_size_binary_array.value(1),
2736 &[4, 5, 6],
2737 "Second item mismatch"
2738 );
2739 }
2740
2741 #[test]
2742 fn test_fixed_decoding_empty() {
2743 let avro_type = avro_from_codec(Codec::Fixed(5));
2744 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2745
2746 let array = decoder
2747 .flush(None)
2748 .expect("Failed to flush decoder for empty input");
2749
2750 assert_eq!(array.len(), 0, "Array should be empty");
2751 let fixed_size_binary_array = array
2752 .as_any()
2753 .downcast_ref::<FixedSizeBinaryArray>()
2754 .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
2755
2756 assert_eq!(
2757 fixed_size_binary_array.value_length(),
2758 5,
2759 "Fixed size of binary values should be 5 as per type"
2760 );
2761 }
2762
2763 #[test]
2764 fn test_uuid_decoding() {
2765 let avro_type = avro_from_codec(Codec::Uuid);
2766 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
2767 let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
2768 let data = encode_avro_bytes(uuid_str.as_bytes());
2769 let mut cursor = AvroCursor::new(&data);
2770 decoder.decode(&mut cursor).expect("Failed to decode data");
2771 assert_eq!(
2772 cursor.position(),
2773 data.len(),
2774 "Cursor should advance by varint size + data size"
2775 );
2776 let array = decoder.flush(None).expect("Failed to flush decoder");
2777 let fixed_size_binary_array = array
2778 .as_any()
2779 .downcast_ref::<FixedSizeBinaryArray>()
2780 .expect("Array should be a FixedSizeBinaryArray");
2781 assert_eq!(fixed_size_binary_array.len(), 1);
2782 assert_eq!(fixed_size_binary_array.value_length(), 16);
2783 let expected_bytes = [
2784 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
2785 0x6b, 0xf6,
2786 ];
2787 assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
2788 }
2789
2790 #[test]
2791 fn test_array_decoding() {
2792 let item_dt = avro_from_codec(Codec::Int32);
2793 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2794 let mut decoder = Decoder::try_new(&list_dt).unwrap();
2795 let mut row1 = Vec::new();
2796 row1.extend_from_slice(&encode_avro_long(2));
2797 row1.extend_from_slice(&encode_avro_int(10));
2798 row1.extend_from_slice(&encode_avro_int(20));
2799 row1.extend_from_slice(&encode_avro_long(0));
2800 let row2 = encode_avro_long(0);
2801 let mut cursor = AvroCursor::new(&row1);
2802 decoder.decode(&mut cursor).unwrap();
2803 let mut cursor2 = AvroCursor::new(&row2);
2804 decoder.decode(&mut cursor2).unwrap();
2805 let array = decoder.flush(None).unwrap();
2806 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2807 assert_eq!(list_arr.len(), 2);
2808 let offsets = list_arr.value_offsets();
2809 assert_eq!(offsets, &[0, 2, 2]);
2810 let values = list_arr.values();
2811 let int_arr = values.as_primitive::<Int32Type>();
2812 assert_eq!(int_arr.len(), 2);
2813 assert_eq!(int_arr.value(0), 10);
2814 assert_eq!(int_arr.value(1), 20);
2815 }
2816
2817 #[test]
2818 fn test_array_decoding_with_negative_block_count() {
2819 let item_dt = avro_from_codec(Codec::Int32);
2820 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
2821 let mut decoder = Decoder::try_new(&list_dt).unwrap();
2822 let mut data = encode_avro_long(-3);
2823 data.extend_from_slice(&encode_avro_long(12));
2824 data.extend_from_slice(&encode_avro_int(1));
2825 data.extend_from_slice(&encode_avro_int(2));
2826 data.extend_from_slice(&encode_avro_int(3));
2827 data.extend_from_slice(&encode_avro_long(0));
2828 let mut cursor = AvroCursor::new(&data);
2829 decoder.decode(&mut cursor).unwrap();
2830 let array = decoder.flush(None).unwrap();
2831 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2832 assert_eq!(list_arr.len(), 1);
2833 assert_eq!(list_arr.value_length(0), 3);
2834 let values = list_arr.values().as_primitive::<Int32Type>();
2835 assert_eq!(values.len(), 3);
2836 assert_eq!(values.value(0), 1);
2837 assert_eq!(values.value(1), 2);
2838 assert_eq!(values.value(2), 3);
2839 }
2840
2841 #[test]
2842 fn test_nested_array_decoding() {
2843 let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
2844 let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
2845 let mut decoder = Decoder::try_new(&nested_ty).unwrap();
2846 let mut buf = Vec::new();
2847 buf.extend(encode_avro_long(1));
2848 buf.extend(encode_avro_long(2));
2849 buf.extend(encode_avro_int(5));
2850 buf.extend(encode_avro_int(6));
2851 buf.extend(encode_avro_long(0));
2852 buf.extend(encode_avro_long(0));
2853 let mut cursor = AvroCursor::new(&buf);
2854 decoder.decode(&mut cursor).unwrap();
2855 let arr = decoder.flush(None).unwrap();
2856 let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
2857 assert_eq!(outer.len(), 1);
2858 assert_eq!(outer.value_length(0), 1);
2859 let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
2860 assert_eq!(inner.len(), 1);
2861 assert_eq!(inner.value_length(0), 2);
2862 let values = inner
2863 .values()
2864 .as_any()
2865 .downcast_ref::<Int32Array>()
2866 .unwrap();
2867 assert_eq!(values.values(), &[5, 6]);
2868 }
2869
2870 #[test]
2871 fn test_array_decoding_empty_array() {
2872 let value_type = avro_from_codec(Codec::Utf8);
2873 let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
2874 let mut decoder = Decoder::try_new(&map_type).unwrap();
2875 let data = encode_avro_long(0);
2876 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
2877 let array = decoder.flush(None).unwrap();
2878 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2879 assert_eq!(list_arr.len(), 1);
2880 assert_eq!(list_arr.value_length(0), 0);
2881 }
2882
2883 #[test]
2884 fn test_decimal_decoding_fixed256() {
2885 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
2886 let mut decoder = Decoder::try_new(&dt).unwrap();
2887 let row1 = [
2888 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2889 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2890 0x00, 0x00, 0x30, 0x39,
2891 ];
2892 let row2 = [
2893 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2894 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2895 0xFF, 0xFF, 0xFF, 0x85,
2896 ];
2897 let mut data = Vec::new();
2898 data.extend_from_slice(&row1);
2899 data.extend_from_slice(&row2);
2900 let mut cursor = AvroCursor::new(&data);
2901 decoder.decode(&mut cursor).unwrap();
2902 decoder.decode(&mut cursor).unwrap();
2903 let arr = decoder.flush(None).unwrap();
2904 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
2905 assert_eq!(dec.len(), 2);
2906 assert_eq!(dec.value_as_string(0), "123.45");
2907 assert_eq!(dec.value_as_string(1), "-1.23");
2908 }
2909
2910 #[test]
2911 fn test_decimal_decoding_fixed128() {
2912 let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
2913 let mut decoder = Decoder::try_new(&dt).unwrap();
2914 let row1 = [
2915 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2916 0x30, 0x39,
2917 ];
2918 let row2 = [
2919 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2920 0xFF, 0x85,
2921 ];
2922 let mut data = Vec::new();
2923 data.extend_from_slice(&row1);
2924 data.extend_from_slice(&row2);
2925 let mut cursor = AvroCursor::new(&data);
2926 decoder.decode(&mut cursor).unwrap();
2927 decoder.decode(&mut cursor).unwrap();
2928 let arr = decoder.flush(None).unwrap();
2929 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2930 assert_eq!(dec.len(), 2);
2931 assert_eq!(dec.value_as_string(0), "123.45");
2932 assert_eq!(dec.value_as_string(1), "-1.23");
2933 }
2934
2935 #[test]
2936 fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
2937 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
2938 let mut decoder = Decoder::try_new(&dt).unwrap();
2939 let row1 = [
2940 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2941 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2942 0x00, 0x00, 0x30, 0x39,
2943 ];
2944 let row2 = [
2945 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2946 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2947 0xFF, 0xFF, 0xFF, 0x85,
2948 ];
2949 let mut data = Vec::new();
2950 data.extend_from_slice(&row1);
2951 data.extend_from_slice(&row2);
2952 let mut cursor = AvroCursor::new(&data);
2953 decoder.decode(&mut cursor).unwrap();
2954 decoder.decode(&mut cursor).unwrap();
2955 let arr = decoder.flush(None).unwrap();
2956 #[cfg(feature = "small_decimals")]
2957 {
2958 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2959 assert_eq!(dec.len(), 2);
2960 assert_eq!(dec.value_as_string(0), "123.45");
2961 assert_eq!(dec.value_as_string(1), "-1.23");
2962 }
2963 #[cfg(not(feature = "small_decimals"))]
2964 {
2965 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
2966 assert_eq!(dec.len(), 2);
2967 assert_eq!(dec.value_as_string(0), "123.45");
2968 assert_eq!(dec.value_as_string(1), "-1.23");
2969 }
2970 }
2971
2972 #[test]
2973 fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
2974 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
2975 let mut decoder = Decoder::try_new(&dt).unwrap();
2976 let row1 = [
2977 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
2978 0x30, 0x39,
2979 ];
2980 let row2 = [
2981 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
2982 0xFF, 0x85,
2983 ];
2984 let mut data = Vec::new();
2985 data.extend_from_slice(&row1);
2986 data.extend_from_slice(&row2);
2987 let mut cursor = AvroCursor::new(&data);
2988 decoder.decode(&mut cursor).unwrap();
2989 decoder.decode(&mut cursor).unwrap();
2990
2991 let arr = decoder.flush(None).unwrap();
2992 #[cfg(feature = "small_decimals")]
2993 {
2994 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
2995 assert_eq!(dec.len(), 2);
2996 assert_eq!(dec.value_as_string(0), "123.45");
2997 assert_eq!(dec.value_as_string(1), "-1.23");
2998 }
2999 #[cfg(not(feature = "small_decimals"))]
3000 {
3001 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3002 assert_eq!(dec.len(), 2);
3003 assert_eq!(dec.value_as_string(0), "123.45");
3004 assert_eq!(dec.value_as_string(1), "-1.23");
3005 }
3006 }
3007
3008 #[test]
3009 fn test_decimal_decoding_bytes_with_nulls() {
3010 let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
3011 let inner = Decoder::try_new(&dt).unwrap();
3012 let mut decoder = Decoder::Nullable(
3013 Nullability::NullSecond,
3014 NullBufferBuilder::new(DEFAULT_CAPACITY),
3015 Box::new(inner),
3016 NullablePlan::ReadTag,
3017 );
3018 let mut data = Vec::new();
3019 data.extend_from_slice(&encode_avro_int(0));
3020 data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
3021 data.extend_from_slice(&encode_avro_int(1));
3022 data.extend_from_slice(&encode_avro_int(0));
3023 data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
3024 let mut cursor = AvroCursor::new(&data);
3025 decoder.decode(&mut cursor).unwrap();
3026 decoder.decode(&mut cursor).unwrap();
3027 decoder.decode(&mut cursor).unwrap();
3028 let arr = decoder.flush(None).unwrap();
3029 #[cfg(feature = "small_decimals")]
3030 {
3031 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3032 assert_eq!(dec_arr.len(), 3);
3033 assert!(dec_arr.is_valid(0));
3034 assert!(!dec_arr.is_valid(1));
3035 assert!(dec_arr.is_valid(2));
3036 assert_eq!(dec_arr.value_as_string(0), "123.4");
3037 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3038 }
3039 #[cfg(not(feature = "small_decimals"))]
3040 {
3041 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3042 assert_eq!(dec_arr.len(), 3);
3043 assert!(dec_arr.is_valid(0));
3044 assert!(!dec_arr.is_valid(1));
3045 assert!(dec_arr.is_valid(2));
3046 assert_eq!(dec_arr.value_as_string(0), "123.4");
3047 assert_eq!(dec_arr.value_as_string(2), "-123.4");
3048 }
3049 }
3050
3051 #[test]
3052 fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
3053 let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
3054 let inner = Decoder::try_new(&dt).unwrap();
3055 let mut decoder = Decoder::Nullable(
3056 Nullability::NullSecond,
3057 NullBufferBuilder::new(DEFAULT_CAPACITY),
3058 Box::new(inner),
3059 NullablePlan::ReadTag,
3060 );
3061 let row1 = [
3062 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
3063 0xE2, 0x40,
3064 ];
3065 let row3 = [
3066 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
3067 0x1D, 0xC0,
3068 ];
3069 let mut data = Vec::new();
3070 data.extend_from_slice(&encode_avro_int(0));
3071 data.extend_from_slice(&row1);
3072 data.extend_from_slice(&encode_avro_int(1));
3073 data.extend_from_slice(&encode_avro_int(0));
3074 data.extend_from_slice(&row3);
3075 let mut cursor = AvroCursor::new(&data);
3076 decoder.decode(&mut cursor).unwrap();
3077 decoder.decode(&mut cursor).unwrap();
3078 decoder.decode(&mut cursor).unwrap();
3079 let arr = decoder.flush(None).unwrap();
3080 #[cfg(feature = "small_decimals")]
3081 {
3082 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
3083 assert_eq!(dec_arr.len(), 3);
3084 assert!(dec_arr.is_valid(0));
3085 assert!(!dec_arr.is_valid(1));
3086 assert!(dec_arr.is_valid(2));
3087 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3088 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3089 }
3090 #[cfg(not(feature = "small_decimals"))]
3091 {
3092 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
3093 assert_eq!(dec_arr.len(), 3);
3094 assert!(dec_arr.is_valid(0));
3095 assert!(!dec_arr.is_valid(1));
3096 assert!(dec_arr.is_valid(2));
3097 assert_eq!(dec_arr.value_as_string(0), "1234.56");
3098 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
3099 }
3100 }
3101
3102 #[test]
3103 fn test_enum_decoding() {
3104 let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
3105 let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
3106 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3107 let mut data = Vec::new();
3108 data.extend_from_slice(&encode_avro_int(2));
3109 data.extend_from_slice(&encode_avro_int(0));
3110 data.extend_from_slice(&encode_avro_int(1));
3111 let mut cursor = AvroCursor::new(&data);
3112 decoder.decode(&mut cursor).unwrap();
3113 decoder.decode(&mut cursor).unwrap();
3114 decoder.decode(&mut cursor).unwrap();
3115 let array = decoder.flush(None).unwrap();
3116 let dict_array = array
3117 .as_any()
3118 .downcast_ref::<DictionaryArray<Int32Type>>()
3119 .unwrap();
3120 assert_eq!(dict_array.len(), 3);
3121 let values = dict_array
3122 .values()
3123 .as_any()
3124 .downcast_ref::<StringArray>()
3125 .unwrap();
3126 assert_eq!(values.value(0), "A");
3127 assert_eq!(values.value(1), "B");
3128 assert_eq!(values.value(2), "C");
3129 assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
3130 }
3131
3132 #[test]
3133 fn test_enum_decoding_with_nulls() {
3134 let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
3135 let enum_codec = Codec::Enum(symbols.clone());
3136 let avro_type =
3137 AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
3138 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3139 let mut data = Vec::new();
3140 data.extend_from_slice(&encode_avro_long(1));
3141 data.extend_from_slice(&encode_avro_int(1));
3142 data.extend_from_slice(&encode_avro_long(0));
3143 data.extend_from_slice(&encode_avro_long(1));
3144 data.extend_from_slice(&encode_avro_int(0));
3145 let mut cursor = AvroCursor::new(&data);
3146 decoder.decode(&mut cursor).unwrap();
3147 decoder.decode(&mut cursor).unwrap();
3148 decoder.decode(&mut cursor).unwrap();
3149 let array = decoder.flush(None).unwrap();
3150 let dict_array = array
3151 .as_any()
3152 .downcast_ref::<DictionaryArray<Int32Type>>()
3153 .unwrap();
3154 assert_eq!(dict_array.len(), 3);
3155 assert!(dict_array.is_valid(0));
3156 assert!(dict_array.is_null(1));
3157 assert!(dict_array.is_valid(2));
3158 let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
3159 assert_eq!(dict_array.keys(), &expected_keys);
3160 let values = dict_array
3161 .values()
3162 .as_any()
3163 .downcast_ref::<StringArray>()
3164 .unwrap();
3165 assert_eq!(values.value(0), "X");
3166 assert_eq!(values.value(1), "Y");
3167 }
3168
3169 #[test]
3170 fn test_duration_decoding_with_nulls() {
3171 let duration_codec = Codec::Interval;
3172 let avro_type = AvroDataType::new(
3173 duration_codec,
3174 Default::default(),
3175 Some(Nullability::NullFirst),
3176 );
3177 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3178 let mut data = Vec::new();
3179 data.extend_from_slice(&encode_avro_long(1)); let mut duration1 = Vec::new();
3182 duration1.extend_from_slice(&1u32.to_le_bytes());
3183 duration1.extend_from_slice(&2u32.to_le_bytes());
3184 duration1.extend_from_slice(&3u32.to_le_bytes());
3185 data.extend_from_slice(&duration1);
3186 data.extend_from_slice(&encode_avro_long(0)); data.extend_from_slice(&encode_avro_long(1)); let mut duration2 = Vec::new();
3190 duration2.extend_from_slice(&4u32.to_le_bytes());
3191 duration2.extend_from_slice(&5u32.to_le_bytes());
3192 duration2.extend_from_slice(&6u32.to_le_bytes());
3193 data.extend_from_slice(&duration2);
3194 let mut cursor = AvroCursor::new(&data);
3195 decoder.decode(&mut cursor).unwrap();
3196 decoder.decode(&mut cursor).unwrap();
3197 decoder.decode(&mut cursor).unwrap();
3198 let array = decoder.flush(None).unwrap();
3199 let interval_array = array
3200 .as_any()
3201 .downcast_ref::<IntervalMonthDayNanoArray>()
3202 .unwrap();
3203 assert_eq!(interval_array.len(), 3);
3204 assert!(interval_array.is_valid(0));
3205 assert!(interval_array.is_null(1));
3206 assert!(interval_array.is_valid(2));
3207 let expected = IntervalMonthDayNanoArray::from(vec![
3208 Some(IntervalMonthDayNano {
3209 months: 1,
3210 days: 2,
3211 nanoseconds: 3_000_000,
3212 }),
3213 None,
3214 Some(IntervalMonthDayNano {
3215 months: 4,
3216 days: 5,
3217 nanoseconds: 6_000_000,
3218 }),
3219 ]);
3220 assert_eq!(interval_array, &expected);
3221 }
3222
3223 #[test]
3224 fn test_duration_decoding_empty() {
3225 let duration_codec = Codec::Interval;
3226 let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
3227 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3228 let array = decoder.flush(None).unwrap();
3229 assert_eq!(array.len(), 0);
3230 }
3231
3232 #[test]
3233 #[cfg(feature = "avro_custom_types")]
3234 fn test_duration_seconds_decoding() {
3235 let avro_type = AvroDataType::new(Codec::DurationSeconds, Default::default(), None);
3236 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3237 let mut data = Vec::new();
3238 data.extend_from_slice(&encode_avro_long(0));
3240 data.extend_from_slice(&encode_avro_long(-1));
3241 data.extend_from_slice(&encode_avro_long(2));
3242 let mut cursor = AvroCursor::new(&data);
3243 decoder.decode(&mut cursor).unwrap();
3244 decoder.decode(&mut cursor).unwrap();
3245 decoder.decode(&mut cursor).unwrap();
3246 let array = decoder.flush(None).unwrap();
3247 let dur = array
3248 .as_any()
3249 .downcast_ref::<DurationSecondArray>()
3250 .unwrap();
3251 assert_eq!(dur.values(), &[0, -1, 2]);
3252 }
3253
3254 #[test]
3255 #[cfg(feature = "avro_custom_types")]
3256 fn test_duration_milliseconds_decoding() {
3257 let avro_type = AvroDataType::new(Codec::DurationMillis, Default::default(), None);
3258 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3259 let mut data = Vec::new();
3260 for v in [1i64, 0, -2] {
3261 data.extend_from_slice(&encode_avro_long(v));
3262 }
3263 let mut cursor = AvroCursor::new(&data);
3264 for _ in 0..3 {
3265 decoder.decode(&mut cursor).unwrap();
3266 }
3267 let array = decoder.flush(None).unwrap();
3268 let dur = array
3269 .as_any()
3270 .downcast_ref::<DurationMillisecondArray>()
3271 .unwrap();
3272 assert_eq!(dur.values(), &[1, 0, -2]);
3273 }
3274
3275 #[test]
3276 #[cfg(feature = "avro_custom_types")]
3277 fn test_duration_microseconds_decoding() {
3278 let avro_type = AvroDataType::new(Codec::DurationMicros, Default::default(), None);
3279 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3280 let mut data = Vec::new();
3281 for v in [5i64, -6, 7] {
3282 data.extend_from_slice(&encode_avro_long(v));
3283 }
3284 let mut cursor = AvroCursor::new(&data);
3285 for _ in 0..3 {
3286 decoder.decode(&mut cursor).unwrap();
3287 }
3288 let array = decoder.flush(None).unwrap();
3289 let dur = array
3290 .as_any()
3291 .downcast_ref::<DurationMicrosecondArray>()
3292 .unwrap();
3293 assert_eq!(dur.values(), &[5, -6, 7]);
3294 }
3295
3296 #[test]
3297 #[cfg(feature = "avro_custom_types")]
3298 fn test_duration_nanoseconds_decoding() {
3299 let avro_type = AvroDataType::new(Codec::DurationNanos, Default::default(), None);
3300 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3301 let mut data = Vec::new();
3302 for v in [8i64, 9, -10] {
3303 data.extend_from_slice(&encode_avro_long(v));
3304 }
3305 let mut cursor = AvroCursor::new(&data);
3306 for _ in 0..3 {
3307 decoder.decode(&mut cursor).unwrap();
3308 }
3309 let array = decoder.flush(None).unwrap();
3310 let dur = array
3311 .as_any()
3312 .downcast_ref::<DurationNanosecondArray>()
3313 .unwrap();
3314 assert_eq!(dur.values(), &[8, 9, -10]);
3315 }
3316
3317 #[test]
3318 fn test_nullable_decode_error_bitmap_corruption() {
3319 let avro_type = AvroDataType::new(
3321 Codec::Int32,
3322 Default::default(),
3323 Some(Nullability::NullSecond),
3324 );
3325 let mut decoder = Decoder::try_new(&avro_type).unwrap();
3326
3327 let mut row1 = Vec::new();
3329 row1.extend_from_slice(&encode_avro_int(1));
3330
3331 let mut row2 = Vec::new();
3333 row2.extend_from_slice(&encode_avro_int(0)); let mut row3 = Vec::new();
3337 row3.extend_from_slice(&encode_avro_int(0)); row3.extend_from_slice(&encode_avro_int(42)); decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
3341 assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
3343
3344 let array = decoder.flush(None).unwrap();
3345
3346 assert_eq!(array.len(), 2);
3348 let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
3349 assert!(int_array.is_null(0)); assert_eq!(int_array.value(1), 42); }
3352
3353 #[test]
3354 fn test_enum_mapping_reordered_symbols() {
3355 let reader_symbols: Arc<[String]> =
3356 vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
3357 let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
3358 let default_index: i32 = -1;
3359 let mut dec = Decoder::Enum(
3360 Vec::with_capacity(DEFAULT_CAPACITY),
3361 reader_symbols.clone(),
3362 Some(EnumResolution {
3363 mapping,
3364 default_index,
3365 }),
3366 );
3367 let mut data = Vec::new();
3368 data.extend_from_slice(&encode_avro_int(0));
3369 data.extend_from_slice(&encode_avro_int(1));
3370 data.extend_from_slice(&encode_avro_int(2));
3371 let mut cur = AvroCursor::new(&data);
3372 dec.decode(&mut cur).unwrap();
3373 dec.decode(&mut cur).unwrap();
3374 dec.decode(&mut cur).unwrap();
3375 let arr = dec.flush(None).unwrap();
3376 let dict = arr
3377 .as_any()
3378 .downcast_ref::<DictionaryArray<Int32Type>>()
3379 .unwrap();
3380 let expected_keys = Int32Array::from(vec![2, 0, 1]);
3381 assert_eq!(dict.keys(), &expected_keys);
3382 let values = dict
3383 .values()
3384 .as_any()
3385 .downcast_ref::<StringArray>()
3386 .unwrap();
3387 assert_eq!(values.value(0), "B");
3388 assert_eq!(values.value(1), "C");
3389 assert_eq!(values.value(2), "A");
3390 }
3391
3392 #[test]
3393 fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
3394 let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
3395 let default_index: i32 = 1;
3396 let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
3397 let mut dec = Decoder::Enum(
3398 Vec::with_capacity(DEFAULT_CAPACITY),
3399 reader_symbols.clone(),
3400 Some(EnumResolution {
3401 mapping,
3402 default_index,
3403 }),
3404 );
3405 let mut data = Vec::new();
3406 data.extend_from_slice(&encode_avro_int(0));
3407 data.extend_from_slice(&encode_avro_int(1));
3408 data.extend_from_slice(&encode_avro_int(99));
3409 let mut cur = AvroCursor::new(&data);
3410 dec.decode(&mut cur).unwrap();
3411 dec.decode(&mut cur).unwrap();
3412 dec.decode(&mut cur).unwrap();
3413 let arr = dec.flush(None).unwrap();
3414 let dict = arr
3415 .as_any()
3416 .downcast_ref::<DictionaryArray<Int32Type>>()
3417 .unwrap();
3418 let expected_keys = Int32Array::from(vec![0, 1, 1]);
3419 assert_eq!(dict.keys(), &expected_keys);
3420 let values = dict
3421 .values()
3422 .as_any()
3423 .downcast_ref::<StringArray>()
3424 .unwrap();
3425 assert_eq!(values.value(0), "A");
3426 assert_eq!(values.value(1), "B");
3427 }
3428
3429 #[test]
3430 fn test_enum_mapping_unknown_symbol_without_default_errors() {
3431 let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
3432 let default_index: i32 = -1; let mapping: Arc<[i32]> = Arc::from(vec![-1]);
3434 let mut dec = Decoder::Enum(
3435 Vec::with_capacity(DEFAULT_CAPACITY),
3436 reader_symbols,
3437 Some(EnumResolution {
3438 mapping,
3439 default_index,
3440 }),
3441 );
3442 let data = encode_avro_int(0);
3443 let mut cur = AvroCursor::new(&data);
3444 let err = dec
3445 .decode(&mut cur)
3446 .expect_err("expected decode error for unresolved enum without default");
3447 let msg = err.to_string();
3448 assert!(
3449 msg.contains("not resolvable") && msg.contains("no default"),
3450 "unexpected error message: {msg}"
3451 );
3452 }
3453
3454 fn make_record_resolved_decoder(
3455 reader_fields: &[(&str, DataType, bool)],
3456 writer_to_reader: Vec<Option<usize>>,
3457 skip_decoders: Vec<Option<Skipper>>,
3458 ) -> Decoder {
3459 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3460 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3461 for (name, dt, nullable) in reader_fields {
3462 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3463 let enc = match dt {
3464 DataType::Int32 => Decoder::Int32(Vec::new()),
3465 DataType::Int64 => Decoder::Int64(Vec::new()),
3466 DataType::Utf8 => {
3467 Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
3468 }
3469 other => panic!("Unsupported test reader field type: {other:?}"),
3470 };
3471 encodings.push(enc);
3472 }
3473 let fields: Fields = field_refs.into();
3474 Decoder::Record(
3475 fields,
3476 encodings,
3477 Some(Projector {
3478 writer_to_reader: Arc::from(writer_to_reader),
3479 skip_decoders,
3480 field_defaults: vec![None; reader_fields.len()],
3481 default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
3482 }),
3483 )
3484 }
3485
3486 #[test]
3487 fn test_skip_writer_trailing_field_int32() {
3488 let mut dec = make_record_resolved_decoder(
3489 &[("id", arrow_schema::DataType::Int32, false)],
3490 vec![Some(0), None],
3491 vec![None, Some(super::Skipper::Int32)],
3492 );
3493 let mut data = Vec::new();
3494 data.extend_from_slice(&encode_avro_int(7));
3495 data.extend_from_slice(&encode_avro_int(999));
3496 let mut cur = AvroCursor::new(&data);
3497 dec.decode(&mut cur).unwrap();
3498 assert_eq!(cur.position(), data.len());
3499 let arr = dec.flush(None).unwrap();
3500 let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
3501 assert_eq!(struct_arr.len(), 1);
3502 let id = struct_arr
3503 .column_by_name("id")
3504 .unwrap()
3505 .as_any()
3506 .downcast_ref::<Int32Array>()
3507 .unwrap();
3508 assert_eq!(id.value(0), 7);
3509 }
3510
3511 #[test]
3512 fn test_skip_writer_middle_field_string() {
3513 let mut dec = make_record_resolved_decoder(
3514 &[
3515 ("id", DataType::Int32, false),
3516 ("score", DataType::Int64, false),
3517 ],
3518 vec![Some(0), None, Some(1)],
3519 vec![None, Some(Skipper::String), None],
3520 );
3521 let mut data = Vec::new();
3522 data.extend_from_slice(&encode_avro_int(42));
3523 data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
3524 data.extend_from_slice(&encode_avro_long(1000));
3525 let mut cur = AvroCursor::new(&data);
3526 dec.decode(&mut cur).unwrap();
3527 assert_eq!(cur.position(), data.len());
3528 let arr = dec.flush(None).unwrap();
3529 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3530 let id = s
3531 .column_by_name("id")
3532 .unwrap()
3533 .as_any()
3534 .downcast_ref::<Int32Array>()
3535 .unwrap();
3536 let score = s
3537 .column_by_name("score")
3538 .unwrap()
3539 .as_any()
3540 .downcast_ref::<Int64Array>()
3541 .unwrap();
3542 assert_eq!(id.value(0), 42);
3543 assert_eq!(score.value(0), 1000);
3544 }
3545
3546 #[test]
3547 fn test_skip_writer_array_with_negative_block_count_fast() {
3548 let mut dec = make_record_resolved_decoder(
3549 &[("id", DataType::Int32, false)],
3550 vec![None, Some(0)],
3551 vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
3552 );
3553 let mut array_payload = Vec::new();
3554 array_payload.extend_from_slice(&encode_avro_int(1));
3555 array_payload.extend_from_slice(&encode_avro_int(2));
3556 array_payload.extend_from_slice(&encode_avro_int(3));
3557 let mut data = Vec::new();
3558 data.extend_from_slice(&encode_avro_long(-3));
3559 data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
3560 data.extend_from_slice(&array_payload);
3561 data.extend_from_slice(&encode_avro_long(0));
3562 data.extend_from_slice(&encode_avro_int(5));
3563 let mut cur = AvroCursor::new(&data);
3564 dec.decode(&mut cur).unwrap();
3565 assert_eq!(cur.position(), data.len());
3566 let arr = dec.flush(None).unwrap();
3567 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3568 let id = s
3569 .column_by_name("id")
3570 .unwrap()
3571 .as_any()
3572 .downcast_ref::<Int32Array>()
3573 .unwrap();
3574 assert_eq!(id.len(), 1);
3575 assert_eq!(id.value(0), 5);
3576 }
3577
3578 #[test]
3579 fn test_skip_writer_map_with_negative_block_count_fast() {
3580 let mut dec = make_record_resolved_decoder(
3581 &[("id", DataType::Int32, false)],
3582 vec![None, Some(0)],
3583 vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
3584 );
3585 let mut entries = Vec::new();
3586 entries.extend_from_slice(&encode_avro_bytes(b"k1"));
3587 entries.extend_from_slice(&encode_avro_int(10));
3588 entries.extend_from_slice(&encode_avro_bytes(b"k2"));
3589 entries.extend_from_slice(&encode_avro_int(20));
3590 let mut data = Vec::new();
3591 data.extend_from_slice(&encode_avro_long(-2));
3592 data.extend_from_slice(&encode_avro_long(entries.len() as i64));
3593 data.extend_from_slice(&entries);
3594 data.extend_from_slice(&encode_avro_long(0));
3595 data.extend_from_slice(&encode_avro_int(123));
3596 let mut cur = AvroCursor::new(&data);
3597 dec.decode(&mut cur).unwrap();
3598 assert_eq!(cur.position(), data.len());
3599 let arr = dec.flush(None).unwrap();
3600 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3601 let id = s
3602 .column_by_name("id")
3603 .unwrap()
3604 .as_any()
3605 .downcast_ref::<Int32Array>()
3606 .unwrap();
3607 assert_eq!(id.len(), 1);
3608 assert_eq!(id.value(0), 123);
3609 }
3610
3611 #[test]
3612 fn test_skip_writer_nullable_field_union_nullfirst() {
3613 let mut dec = make_record_resolved_decoder(
3614 &[("id", DataType::Int32, false)],
3615 vec![None, Some(0)],
3616 vec![
3617 Some(super::Skipper::Nullable(
3618 Nullability::NullFirst,
3619 Box::new(super::Skipper::Int32),
3620 )),
3621 None,
3622 ],
3623 );
3624 let mut row1 = Vec::new();
3625 row1.extend_from_slice(&encode_avro_long(0));
3626 row1.extend_from_slice(&encode_avro_int(5));
3627 let mut row2 = Vec::new();
3628 row2.extend_from_slice(&encode_avro_long(1));
3629 row2.extend_from_slice(&encode_avro_int(123));
3630 row2.extend_from_slice(&encode_avro_int(7));
3631 let mut cur1 = AvroCursor::new(&row1);
3632 let mut cur2 = AvroCursor::new(&row2);
3633 dec.decode(&mut cur1).unwrap();
3634 dec.decode(&mut cur2).unwrap();
3635 assert_eq!(cur1.position(), row1.len());
3636 assert_eq!(cur2.position(), row2.len());
3637 let arr = dec.flush(None).unwrap();
3638 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
3639 let id = s
3640 .column_by_name("id")
3641 .unwrap()
3642 .as_any()
3643 .downcast_ref::<Int32Array>()
3644 .unwrap();
3645 assert_eq!(id.len(), 2);
3646 assert_eq!(id.value(0), 5);
3647 assert_eq!(id.value(1), 7);
3648 }
3649
3650 fn make_dense_union_avro(
3651 children: Vec<(Codec, &'_ str, DataType)>,
3652 type_ids: Vec<i8>,
3653 ) -> AvroDataType {
3654 let mut avro_children: Vec<AvroDataType> = Vec::with_capacity(children.len());
3655 let mut fields: Vec<arrow_schema::Field> = Vec::with_capacity(children.len());
3656 for (codec, name, dt) in children.into_iter() {
3657 avro_children.push(AvroDataType::new(codec, Default::default(), None));
3658 fields.push(arrow_schema::Field::new(name, dt, true));
3659 }
3660 let union_fields = UnionFields::new(type_ids, fields);
3661 let union_codec = Codec::Union(avro_children.into(), union_fields, UnionMode::Dense);
3662 AvroDataType::new(union_codec, Default::default(), None)
3663 }
3664
3665 #[test]
3666 fn test_union_dense_two_children_custom_type_ids() {
3667 let union_dt = make_dense_union_avro(
3668 vec![
3669 (Codec::Int32, "i", DataType::Int32),
3670 (Codec::Utf8, "s", DataType::Utf8),
3671 ],
3672 vec![2, 5],
3673 );
3674 let mut dec = Decoder::try_new(&union_dt).unwrap();
3675 let mut r1 = Vec::new();
3676 r1.extend_from_slice(&encode_avro_long(0));
3677 r1.extend_from_slice(&encode_avro_int(7));
3678 let mut r2 = Vec::new();
3679 r2.extend_from_slice(&encode_avro_long(1));
3680 r2.extend_from_slice(&encode_avro_bytes(b"x"));
3681 let mut r3 = Vec::new();
3682 r3.extend_from_slice(&encode_avro_long(0));
3683 r3.extend_from_slice(&encode_avro_int(-1));
3684 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3685 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3686 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3687 let array = dec.flush(None).unwrap();
3688 let ua = array
3689 .as_any()
3690 .downcast_ref::<UnionArray>()
3691 .expect("expected UnionArray");
3692 assert_eq!(ua.len(), 3);
3693 assert_eq!(ua.type_id(0), 2);
3694 assert_eq!(ua.type_id(1), 5);
3695 assert_eq!(ua.type_id(2), 2);
3696 assert_eq!(ua.value_offset(0), 0);
3697 assert_eq!(ua.value_offset(1), 0);
3698 assert_eq!(ua.value_offset(2), 1);
3699 let int_child = ua
3700 .child(2)
3701 .as_any()
3702 .downcast_ref::<Int32Array>()
3703 .expect("int child");
3704 assert_eq!(int_child.len(), 2);
3705 assert_eq!(int_child.value(0), 7);
3706 assert_eq!(int_child.value(1), -1);
3707 let str_child = ua
3708 .child(5)
3709 .as_any()
3710 .downcast_ref::<StringArray>()
3711 .expect("string child");
3712 assert_eq!(str_child.len(), 1);
3713 assert_eq!(str_child.value(0), "x");
3714 }
3715
3716 #[test]
3717 fn test_union_dense_with_null_and_string_children() {
3718 let union_dt = make_dense_union_avro(
3719 vec![
3720 (Codec::Null, "n", DataType::Null),
3721 (Codec::Utf8, "s", DataType::Utf8),
3722 ],
3723 vec![42, 7],
3724 );
3725 let mut dec = Decoder::try_new(&union_dt).unwrap();
3726 let r1 = encode_avro_long(0);
3727 let mut r2 = Vec::new();
3728 r2.extend_from_slice(&encode_avro_long(1));
3729 r2.extend_from_slice(&encode_avro_bytes(b"abc"));
3730 let r3 = encode_avro_long(0);
3731 dec.decode(&mut AvroCursor::new(&r1)).unwrap();
3732 dec.decode(&mut AvroCursor::new(&r2)).unwrap();
3733 dec.decode(&mut AvroCursor::new(&r3)).unwrap();
3734 let array = dec.flush(None).unwrap();
3735 let ua = array
3736 .as_any()
3737 .downcast_ref::<UnionArray>()
3738 .expect("expected UnionArray");
3739 assert_eq!(ua.len(), 3);
3740 assert_eq!(ua.type_id(0), 42);
3741 assert_eq!(ua.type_id(1), 7);
3742 assert_eq!(ua.type_id(2), 42);
3743 assert_eq!(ua.value_offset(0), 0);
3744 assert_eq!(ua.value_offset(1), 0);
3745 assert_eq!(ua.value_offset(2), 1);
3746 let null_child = ua
3747 .child(42)
3748 .as_any()
3749 .downcast_ref::<NullArray>()
3750 .expect("null child");
3751 assert_eq!(null_child.len(), 2);
3752 let str_child = ua
3753 .child(7)
3754 .as_any()
3755 .downcast_ref::<StringArray>()
3756 .expect("string child");
3757 assert_eq!(str_child.len(), 1);
3758 assert_eq!(str_child.value(0), "abc");
3759 }
3760
3761 #[test]
3762 fn test_union_decode_negative_branch_index_errors() {
3763 let union_dt = make_dense_union_avro(
3764 vec![
3765 (Codec::Int32, "i", DataType::Int32),
3766 (Codec::Utf8, "s", DataType::Utf8),
3767 ],
3768 vec![0, 1],
3769 );
3770 let mut dec = Decoder::try_new(&union_dt).unwrap();
3771 let row = encode_avro_long(-1); let err = dec
3773 .decode(&mut AvroCursor::new(&row))
3774 .expect_err("expected error for negative branch index");
3775 let msg = err.to_string();
3776 assert!(
3777 msg.contains("Negative union branch index"),
3778 "unexpected error message: {msg}"
3779 );
3780 }
3781
3782 #[test]
3783 fn test_union_decode_out_of_range_branch_index_errors() {
3784 let union_dt = make_dense_union_avro(
3785 vec![
3786 (Codec::Int32, "i", DataType::Int32),
3787 (Codec::Utf8, "s", DataType::Utf8),
3788 ],
3789 vec![10, 11],
3790 );
3791 let mut dec = Decoder::try_new(&union_dt).unwrap();
3792 let row = encode_avro_long(2);
3793 let err = dec
3794 .decode(&mut AvroCursor::new(&row))
3795 .expect_err("expected error for out-of-range branch index");
3796 let msg = err.to_string();
3797 assert!(
3798 msg.contains("out of range"),
3799 "unexpected error message: {msg}"
3800 );
3801 }
3802
3803 #[test]
3804 fn test_union_sparse_mode_not_supported() {
3805 let children: Vec<AvroDataType> = vec![
3806 AvroDataType::new(Codec::Int32, Default::default(), None),
3807 AvroDataType::new(Codec::Utf8, Default::default(), None),
3808 ];
3809 let uf = UnionFields::new(
3810 vec![1, 3],
3811 vec![
3812 arrow_schema::Field::new("i", DataType::Int32, true),
3813 arrow_schema::Field::new("s", DataType::Utf8, true),
3814 ],
3815 );
3816 let codec = Codec::Union(children.into(), uf, UnionMode::Sparse);
3817 let dt = AvroDataType::new(codec, Default::default(), None);
3818 let err = Decoder::try_new(&dt).expect_err("sparse union should not be supported");
3819 let msg = err.to_string();
3820 assert!(
3821 msg.contains("Sparse Arrow unions are not yet supported"),
3822 "unexpected error message: {msg}"
3823 );
3824 }
3825
3826 fn make_record_decoder_with_projector_defaults(
3827 reader_fields: &[(&str, DataType, bool)],
3828 field_defaults: Vec<Option<AvroLiteral>>,
3829 default_injections: Vec<(usize, AvroLiteral)>,
3830 writer_to_reader_len: usize,
3831 ) -> Decoder {
3832 assert_eq!(
3833 field_defaults.len(),
3834 reader_fields.len(),
3835 "field_defaults must have one entry per reader field"
3836 );
3837 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
3838 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
3839 for (name, dt, nullable) in reader_fields {
3840 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
3841 let enc = match dt {
3842 DataType::Int32 => Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
3843 DataType::Int64 => Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
3844 DataType::Utf8 => Decoder::String(
3845 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3846 Vec::with_capacity(DEFAULT_CAPACITY),
3847 ),
3848 other => panic!("Unsupported test field type in helper: {other:?}"),
3849 };
3850 encodings.push(enc);
3851 }
3852 let fields: Fields = field_refs.into();
3853 let skip_decoders: Vec<Option<Skipper>> =
3854 (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
3855 let projector = Projector {
3856 writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
3857 skip_decoders,
3858 field_defaults,
3859 default_injections: Arc::from(default_injections),
3860 };
3861 Decoder::Record(fields, encodings, Some(projector))
3862 }
3863
3864 #[test]
3865 fn test_default_append_int32_and_int64_from_int_and_long() {
3866 let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3867 d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
3868 let arr = d_i32.flush(None).unwrap();
3869 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3870 assert_eq!(a.len(), 1);
3871 assert_eq!(a.value(0), 42);
3872 let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
3873 d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
3874 d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
3875 let arr64 = d_i64.flush(None).unwrap();
3876 let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
3877 assert_eq!(a64.len(), 2);
3878 assert_eq!(a64.value(0), 5);
3879 assert_eq!(a64.value(1), 7);
3880 }
3881
3882 #[test]
3883 fn test_default_append_floats_and_doubles() {
3884 let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
3885 d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
3886 let arr32 = d_f32.flush(None).unwrap();
3887 let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
3888 assert_eq!(a.value(0), 1.5);
3889 let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
3890 d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
3891 let arr64 = d_f64.flush(None).unwrap();
3892 let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
3893 assert_eq!(b.value(0), 2.25);
3894 }
3895
3896 #[test]
3897 fn test_default_append_string_and_bytes() {
3898 let mut d_str = Decoder::String(
3899 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3900 Vec::with_capacity(DEFAULT_CAPACITY),
3901 );
3902 d_str
3903 .append_default(&AvroLiteral::String("hi".into()))
3904 .unwrap();
3905 let s_arr = d_str.flush(None).unwrap();
3906 let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
3907 assert_eq!(arr.value(0), "hi");
3908 let mut d_bytes = Decoder::Binary(
3909 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3910 Vec::with_capacity(DEFAULT_CAPACITY),
3911 );
3912 d_bytes
3913 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
3914 .unwrap();
3915 let b_arr = d_bytes.flush(None).unwrap();
3916 let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
3917 assert_eq!(barr.value(0), &[1, 2, 3]);
3918 let mut d_str_err = Decoder::String(
3919 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
3920 Vec::with_capacity(DEFAULT_CAPACITY),
3921 );
3922 let err = d_str_err
3923 .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
3924 .unwrap_err();
3925 assert!(
3926 err.to_string()
3927 .contains("Default for string must be string"),
3928 "unexpected error: {err:?}"
3929 );
3930 }
3931
3932 #[test]
3933 fn test_default_append_nullable_int32_null_and_value() {
3934 let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
3935 let mut dec = Decoder::Nullable(
3936 Nullability::NullFirst,
3937 NullBufferBuilder::new(DEFAULT_CAPACITY),
3938 Box::new(inner),
3939 NullablePlan::ReadTag,
3940 );
3941 dec.append_default(&AvroLiteral::Null).unwrap();
3942 dec.append_default(&AvroLiteral::Int(11)).unwrap();
3943 let arr = dec.flush(None).unwrap();
3944 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
3945 assert_eq!(a.len(), 2);
3946 assert!(a.is_null(0));
3947 assert_eq!(a.value(1), 11);
3948 }
3949
3950 #[test]
3951 fn test_default_append_array_of_ints() {
3952 let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
3953 let mut d = Decoder::try_new(&list_dt).unwrap();
3954 let items = vec![
3955 AvroLiteral::Int(1),
3956 AvroLiteral::Int(2),
3957 AvroLiteral::Int(3),
3958 ];
3959 d.append_default(&AvroLiteral::Array(items)).unwrap();
3960 let arr = d.flush(None).unwrap();
3961 let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
3962 assert_eq!(list.len(), 1);
3963 assert_eq!(list.value_length(0), 3);
3964 let vals = list.values().as_any().downcast_ref::<Int32Array>().unwrap();
3965 assert_eq!(vals.values(), &[1, 2, 3]);
3966 }
3967
3968 #[test]
3969 fn test_default_append_map_string_to_int() {
3970 let map_dt = avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
3971 let mut d = Decoder::try_new(&map_dt).unwrap();
3972 let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
3973 m.insert("k1".to_string(), AvroLiteral::Int(10));
3974 m.insert("k2".to_string(), AvroLiteral::Int(20));
3975 d.append_default(&AvroLiteral::Map(m)).unwrap();
3976 let arr = d.flush(None).unwrap();
3977 let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
3978 assert_eq!(map.len(), 1);
3979 assert_eq!(map.value_length(0), 2);
3980 let binding = map.value(0);
3981 let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
3982 let k = entries
3983 .column_by_name("key")
3984 .unwrap()
3985 .as_any()
3986 .downcast_ref::<StringArray>()
3987 .unwrap();
3988 let v = entries
3989 .column_by_name("value")
3990 .unwrap()
3991 .as_any()
3992 .downcast_ref::<Int32Array>()
3993 .unwrap();
3994 let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i| k.value(i)).collect();
3995 assert_eq!(keys, ["k1", "k2"].into_iter().collect());
3996 let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i| v.value(i)).collect();
3997 assert_eq!(vals, [10, 20].into_iter().collect());
3998 }
3999
4000 #[test]
4001 fn test_default_append_enum_by_symbol() {
4002 let symbols: Arc<[String]> = vec!["A".into(), "B".into(), "C".into()].into();
4003 let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone(), None);
4004 d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
4005 let arr = d.flush(None).unwrap();
4006 let dict = arr
4007 .as_any()
4008 .downcast_ref::<DictionaryArray<Int32Type>>()
4009 .unwrap();
4010 assert_eq!(dict.len(), 1);
4011 let expected = Int32Array::from(vec![1]);
4012 assert_eq!(dict.keys(), &expected);
4013 let values = dict
4014 .values()
4015 .as_any()
4016 .downcast_ref::<StringArray>()
4017 .unwrap();
4018 assert_eq!(values.value(1), "B");
4019 }
4020
4021 #[test]
4022 fn test_default_append_uuid_and_type_error() {
4023 let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4024 let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
4025 d.append_default(&AvroLiteral::String(uuid_str.into()))
4026 .unwrap();
4027 let arr_ref = d.flush(None).unwrap();
4028 let arr = arr_ref
4029 .as_any()
4030 .downcast_ref::<FixedSizeBinaryArray>()
4031 .unwrap();
4032 assert_eq!(arr.value_length(), 16);
4033 assert_eq!(arr.len(), 1);
4034 let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
4035 let err = d2
4036 .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
4037 .unwrap_err();
4038 assert!(
4039 err.to_string().contains("Default for uuid must be string"),
4040 "unexpected error: {err:?}"
4041 );
4042 }
4043
4044 #[test]
4045 fn test_default_append_fixed_and_length_mismatch() {
4046 let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4047 d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
4048 .unwrap();
4049 let arr_ref = d.flush(None).unwrap();
4050 let arr = arr_ref
4051 .as_any()
4052 .downcast_ref::<FixedSizeBinaryArray>()
4053 .unwrap();
4054 assert_eq!(arr.value_length(), 4);
4055 assert_eq!(arr.value(0), &[1, 2, 3, 4]);
4056 let mut d_err = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
4057 let err = d_err
4058 .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
4059 .unwrap_err();
4060 assert!(
4061 err.to_string().contains("Fixed default length"),
4062 "unexpected error: {err:?}"
4063 );
4064 }
4065
4066 #[test]
4067 fn test_default_append_duration_and_length_validation() {
4068 let dt = avro_from_codec(Codec::Interval);
4069 let mut d = Decoder::try_new(&dt).unwrap();
4070 let mut bytes = Vec::with_capacity(12);
4071 bytes.extend_from_slice(&1u32.to_le_bytes());
4072 bytes.extend_from_slice(&2u32.to_le_bytes());
4073 bytes.extend_from_slice(&3u32.to_le_bytes());
4074 d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
4075 let arr_ref = d.flush(None).unwrap();
4076 let arr = arr_ref
4077 .as_any()
4078 .downcast_ref::<IntervalMonthDayNanoArray>()
4079 .unwrap();
4080 assert_eq!(arr.len(), 1);
4081 let v = arr.value(0);
4082 assert_eq!(v.months, 1);
4083 assert_eq!(v.days, 2);
4084 assert_eq!(v.nanoseconds, 3_000_000);
4085 let mut d_err = Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
4086 let err = d_err
4087 .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
4088 .unwrap_err();
4089 assert!(
4090 err.to_string()
4091 .contains("Duration default must be exactly 12 bytes"),
4092 "unexpected error: {err:?}"
4093 );
4094 }
4095
4096 #[test]
4097 fn test_default_append_decimal256_from_bytes() {
4098 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
4099 let mut d = Decoder::try_new(&dt).unwrap();
4100 let pos: [u8; 32] = [
4101 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4102 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
4103 0x00, 0x00, 0x30, 0x39,
4104 ];
4105 d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
4106 let neg: [u8; 32] = [
4107 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4108 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
4109 0xFF, 0xFF, 0xFF, 0x85,
4110 ];
4111 d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
4112 let arr = d.flush(None).unwrap();
4113 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
4114 assert_eq!(dec.len(), 2);
4115 assert_eq!(dec.value_as_string(0), "123.45");
4116 assert_eq!(dec.value_as_string(1), "-1.23");
4117 }
4118
4119 #[test]
4120 fn test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
4121 let field_defaults = vec![None, Some(AvroLiteral::String("hi".into()))];
4122 let mut rec = make_record_decoder_with_projector_defaults(
4123 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4124 field_defaults,
4125 vec![],
4126 0,
4127 );
4128 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4129 map.insert("a".to_string(), AvroLiteral::Int(7));
4130 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4131 let arr = rec.flush(None).unwrap();
4132 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4133 let a = s
4134 .column_by_name("a")
4135 .unwrap()
4136 .as_any()
4137 .downcast_ref::<Int32Array>()
4138 .unwrap();
4139 let b = s
4140 .column_by_name("b")
4141 .unwrap()
4142 .as_any()
4143 .downcast_ref::<StringArray>()
4144 .unwrap();
4145 assert_eq!(a.value(0), 7);
4146 assert_eq!(b.value(0), "hi");
4147 }
4148
4149 #[test]
4150 fn test_record_append_default_null_uses_projector_field_defaults() {
4151 let field_defaults = vec![
4152 Some(AvroLiteral::Int(5)),
4153 Some(AvroLiteral::String("x".into())),
4154 ];
4155 let mut rec = make_record_decoder_with_projector_defaults(
4156 &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
4157 field_defaults,
4158 vec![],
4159 0,
4160 );
4161 rec.append_default(&AvroLiteral::Null).unwrap();
4162 let arr = rec.flush(None).unwrap();
4163 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4164 let a = s
4165 .column_by_name("a")
4166 .unwrap()
4167 .as_any()
4168 .downcast_ref::<Int32Array>()
4169 .unwrap();
4170 let b = s
4171 .column_by_name("b")
4172 .unwrap()
4173 .as_any()
4174 .downcast_ref::<StringArray>()
4175 .unwrap();
4176 assert_eq!(a.value(0), 5);
4177 assert_eq!(b.value(0), "x");
4178 }
4179
4180 #[test]
4181 fn test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties()
4182 {
4183 let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8, true)];
4184 let mut field_refs: Vec<FieldRef> = Vec::new();
4185 let mut encoders: Vec<Decoder> = Vec::new();
4186 for (name, dt, nullable) in &fields {
4187 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
4188 }
4189 let enc_a = Decoder::Nullable(
4190 Nullability::NullSecond,
4191 NullBufferBuilder::new(DEFAULT_CAPACITY),
4192 Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
4193 NullablePlan::ReadTag,
4194 );
4195 let enc_b = Decoder::Nullable(
4196 Nullability::NullSecond,
4197 NullBufferBuilder::new(DEFAULT_CAPACITY),
4198 Box::new(Decoder::String(
4199 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
4200 Vec::with_capacity(DEFAULT_CAPACITY),
4201 )),
4202 NullablePlan::ReadTag,
4203 );
4204 encoders.push(enc_a);
4205 encoders.push(enc_b);
4206 let projector = Projector {
4207 writer_to_reader: Arc::from(vec![]),
4208 skip_decoders: vec![],
4209 field_defaults: vec![None, None], default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
4211 };
4212 let mut rec = Decoder::Record(field_refs.into(), encoders, Some(projector));
4213 let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
4214 map.insert("a".to_string(), AvroLiteral::Int(9));
4215 rec.append_default(&AvroLiteral::Map(map)).unwrap();
4216 let arr = rec.flush(None).unwrap();
4217 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4218 let a = s
4219 .column_by_name("a")
4220 .unwrap()
4221 .as_any()
4222 .downcast_ref::<Int32Array>()
4223 .unwrap();
4224 let b = s
4225 .column_by_name("b")
4226 .unwrap()
4227 .as_any()
4228 .downcast_ref::<StringArray>()
4229 .unwrap();
4230 assert!(a.is_valid(0));
4231 assert_eq!(a.value(0), 9);
4232 assert!(b.is_null(0));
4233 }
4234
4235 #[test]
4236 fn test_projector_default_injection_when_writer_lacks_fields() {
4237 let defaults = vec![None, None];
4238 let injections = vec![
4239 (0, AvroLiteral::Int(99)),
4240 (1, AvroLiteral::String("alice".into())),
4241 ];
4242 let mut rec = make_record_decoder_with_projector_defaults(
4243 &[
4244 ("id", DataType::Int32, false),
4245 ("name", DataType::Utf8, false),
4246 ],
4247 defaults,
4248 injections,
4249 0,
4250 );
4251 rec.decode(&mut AvroCursor::new(&[])).unwrap();
4252 let arr = rec.flush(None).unwrap();
4253 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
4254 let id = s
4255 .column_by_name("id")
4256 .unwrap()
4257 .as_any()
4258 .downcast_ref::<Int32Array>()
4259 .unwrap();
4260 let name = s
4261 .column_by_name("name")
4262 .unwrap()
4263 .as_any()
4264 .downcast_ref::<StringArray>()
4265 .unwrap();
4266 assert_eq!(id.value(0), 99);
4267 assert_eq!(name.value(0), "alice");
4268 }
4269
4270 #[test]
4271 fn union_type_ids_are_not_child_indexes() {
4272 let encodings: Vec<AvroDataType> =
4273 vec![avro_from_codec(Codec::Int32), avro_from_codec(Codec::Utf8)];
4274 let fields: UnionFields = [
4275 (42_i8, Arc::new(ArrowField::new("a", DataType::Int32, true))),
4276 (7_i8, Arc::new(ArrowField::new("b", DataType::Utf8, true))),
4277 ]
4278 .into_iter()
4279 .collect();
4280 let dt = avro_from_codec(Codec::Union(
4281 encodings.into(),
4282 fields.clone(),
4283 UnionMode::Dense,
4284 ));
4285 let mut dec = Decoder::try_new(&dt).expect("decoder");
4286 let mut b1 = encode_avro_long(1);
4287 b1.extend(encode_avro_bytes("hi".as_bytes()));
4288 dec.decode(&mut AvroCursor::new(&b1)).expect("decode b1");
4289 let mut b0 = encode_avro_long(0);
4290 b0.extend(encode_avro_int(5));
4291 dec.decode(&mut AvroCursor::new(&b0)).expect("decode b0");
4292 let arr = dec.flush(None).expect("flush");
4293 let ua = arr.as_any().downcast_ref::<UnionArray>().expect("union");
4294 assert_eq!(ua.len(), 2);
4295 assert_eq!(ua.type_id(0), 7, "type id must come from UnionFields");
4296 assert_eq!(ua.type_id(1), 42, "type id must come from UnionFields");
4297 assert_eq!(ua.value_offset(0), 0);
4298 assert_eq!(ua.value_offset(1), 0);
4299 let utf8_child = ua.child(7).as_any().downcast_ref::<StringArray>().unwrap();
4300 assert_eq!(utf8_child.len(), 1);
4301 assert_eq!(utf8_child.value(0), "hi");
4302 let int_child = ua.child(42).as_any().downcast_ref::<Int32Array>().unwrap();
4303 assert_eq!(int_child.len(), 1);
4304 assert_eq!(int_child.value(0), 5);
4305 let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
4306 assert_eq!(type_ids, vec![42_i8, 7_i8]);
4307 }
4308
4309 #[cfg(feature = "avro_custom_types")]
4310 #[test]
4311 fn skipper_from_avro_maps_custom_duration_variants_to_int64() -> Result<(), ArrowError> {
4312 for codec in [
4313 Codec::DurationNanos,
4314 Codec::DurationMicros,
4315 Codec::DurationMillis,
4316 Codec::DurationSeconds,
4317 ] {
4318 let dt = make_avro_dt(codec.clone(), None);
4319 let s = Skipper::from_avro(&dt)?;
4320 match s {
4321 Skipper::Int64 => {}
4322 other => panic!("expected Int64 skipper for {:?}, got {:?}", codec, other),
4323 }
4324 }
4325 Ok(())
4326 }
4327
4328 #[cfg(feature = "avro_custom_types")]
4329 #[test]
4330 fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(), ArrowError> {
4331 let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN / 3];
4332 for codec in [
4333 Codec::DurationNanos,
4334 Codec::DurationMicros,
4335 Codec::DurationMillis,
4336 Codec::DurationSeconds,
4337 ] {
4338 let dt = make_avro_dt(codec.clone(), None);
4339 let mut s = Skipper::from_avro(&dt)?;
4340 for &v in &values {
4341 let bytes = encode_avro_long(v);
4342 let mut cursor = AvroCursor::new(&bytes);
4343 s.skip(&mut cursor)?;
4344 assert_eq!(
4345 cursor.position(),
4346 bytes.len(),
4347 "did not consume all bytes for {:?} value {}",
4348 codec,
4349 v
4350 );
4351 }
4352 }
4353 Ok(())
4354 }
4355
4356 #[cfg(feature = "avro_custom_types")]
4357 #[test]
4358 fn skipper_nullable_custom_duration_respects_null_first() -> Result<(), ArrowError> {
4359 let dt = make_avro_dt(Codec::DurationNanos, Some(Nullability::NullFirst));
4360 let mut s = Skipper::from_avro(&dt)?;
4361 match &s {
4362 Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
4363 Skipper::Int64 => {}
4364 ref other => panic!("expected inner Int64, got {:?}", other),
4365 },
4366 other => panic!("expected Nullable(NullFirst, Int64), got {:?}", other),
4367 }
4368 {
4369 let buf = encode_vlq_u64(0);
4370 let mut cursor = AvroCursor::new(&buf);
4371 s.skip(&mut cursor)?;
4372 assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
4373 }
4374 {
4375 let mut buf = encode_vlq_u64(1);
4376 buf.extend(encode_avro_long(0));
4377 let mut cursor = AvroCursor::new(&buf);
4378 s.skip(&mut cursor)?;
4379 assert_eq!(cursor.position(), 2, "expected to consume tag=1 + long(0)");
4380 }
4381
4382 Ok(())
4383 }
4384
4385 #[cfg(feature = "avro_custom_types")]
4386 #[test]
4387 fn skipper_nullable_custom_duration_respects_null_second() -> Result<(), ArrowError> {
4388 let dt = make_avro_dt(Codec::DurationMicros, Some(Nullability::NullSecond));
4389 let mut s = Skipper::from_avro(&dt)?;
4390 match &s {
4391 Skipper::Nullable(Nullability::NullSecond, inner) => match **inner {
4392 Skipper::Int64 => {}
4393 ref other => panic!("expected inner Int64, got {:?}", other),
4394 },
4395 other => panic!("expected Nullable(NullSecond, Int64), got {:?}", other),
4396 }
4397 {
4398 let buf = encode_vlq_u64(1);
4399 let mut cursor = AvroCursor::new(&buf);
4400 s.skip(&mut cursor)?;
4401 assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
4402 }
4403 {
4404 let mut buf = encode_vlq_u64(0);
4405 buf.extend(encode_avro_long(-1));
4406 let mut cursor = AvroCursor::new(&buf);
4407 s.skip(&mut cursor)?;
4408 assert_eq!(
4409 cursor.position(),
4410 1 + encode_avro_long(-1).len(),
4411 "expected to consume tag=0 + long(-1)"
4412 );
4413 }
4414 Ok(())
4415 }
4416
4417 #[test]
4418 fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(), ArrowError> {
4419 let dt = make_avro_dt(Codec::Interval, None);
4420 let mut s = Skipper::from_avro(&dt)?;
4421 match s {
4422 Skipper::DurationFixed12 => {}
4423 other => panic!("expected DurationFixed12, got {:?}", other),
4424 }
4425 let payload = vec![0u8; 12];
4426 let mut cursor = AvroCursor::new(&payload);
4427 s.skip(&mut cursor)?;
4428 assert_eq!(cursor.position(), 12, "expected to consume 12 fixed bytes");
4429 Ok(())
4430 }
4431
4432 #[cfg(feature = "avro_custom_types")]
4433 #[test]
4434 fn test_run_end_encoded_width16_int32_basic_grouping() {
4435 use arrow_array::RunArray;
4436 use std::sync::Arc;
4437 let inner = avro_from_codec(Codec::Int32);
4438 let ree = AvroDataType::new(
4439 Codec::RunEndEncoded(Arc::new(inner), 16),
4440 Default::default(),
4441 None,
4442 );
4443 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4444 for v in [1, 1, 1, 2, 2, 3, 3, 3, 3] {
4445 let bytes = encode_avro_int(v);
4446 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4447 }
4448 let arr = dec.flush(None).expect("flush");
4449 let ra = arr
4450 .as_any()
4451 .downcast_ref::<RunArray<Int16Type>>()
4452 .expect("RunArray<Int16Type>");
4453 assert_eq!(ra.len(), 9);
4454 assert_eq!(ra.run_ends().values(), &[3, 5, 9]);
4455 let vals = ra
4456 .values()
4457 .as_ref()
4458 .as_any()
4459 .downcast_ref::<Int32Array>()
4460 .expect("values Int32");
4461 assert_eq!(vals.values(), &[1, 2, 3]);
4462 }
4463
4464 #[cfg(feature = "avro_custom_types")]
4465 #[test]
4466 fn test_run_end_encoded_width32_nullable_values_group_nulls() {
4467 use arrow_array::RunArray;
4468 use std::sync::Arc;
4469 let inner = AvroDataType::new(
4470 Codec::Int32,
4471 Default::default(),
4472 Some(Nullability::NullSecond),
4473 );
4474 let ree = AvroDataType::new(
4475 Codec::RunEndEncoded(Arc::new(inner), 32),
4476 Default::default(),
4477 None,
4478 );
4479 let mut dec = Decoder::try_new(&ree).expect("create REE decoder");
4480 let seq: [Option<i32>; 8] = [
4481 None,
4482 None,
4483 Some(7),
4484 Some(7),
4485 Some(7),
4486 None,
4487 Some(5),
4488 Some(5),
4489 ];
4490 for item in seq {
4491 let mut bytes = Vec::new();
4492 match item {
4493 None => bytes.extend_from_slice(&encode_vlq_u64(1)),
4494 Some(v) => {
4495 bytes.extend_from_slice(&encode_vlq_u64(0));
4496 bytes.extend_from_slice(&encode_avro_int(v));
4497 }
4498 }
4499 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4500 }
4501 let arr = dec.flush(None).expect("flush");
4502 let ra = arr
4503 .as_any()
4504 .downcast_ref::<RunArray<Int32Type>>()
4505 .expect("RunArray<Int32Type>");
4506 assert_eq!(ra.len(), 8);
4507 assert_eq!(ra.run_ends().values(), &[2, 5, 6, 8]);
4508 let vals = ra
4509 .values()
4510 .as_ref()
4511 .as_any()
4512 .downcast_ref::<Int32Array>()
4513 .expect("values Int32 (nullable)");
4514 assert_eq!(vals.len(), 4);
4515 assert!(vals.is_null(0));
4516 assert_eq!(vals.value(1), 7);
4517 assert!(vals.is_null(2));
4518 assert_eq!(vals.value(3), 5);
4519 }
4520
4521 #[cfg(feature = "avro_custom_types")]
4522 #[test]
4523 fn test_run_end_encoded_decode_with_promotion_int_to_double_via_nullable_from_single() {
4524 use arrow_array::RunArray;
4525 let inner_values = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
4526 let ree = Decoder::RunEndEncoded(
4527 8, 0,
4529 Box::new(inner_values),
4530 );
4531 let mut dec = Decoder::Nullable(
4532 Nullability::NullSecond,
4533 NullBufferBuilder::new(DEFAULT_CAPACITY),
4534 Box::new(ree),
4535 NullablePlan::FromSingle {
4536 promotion: Promotion::IntToDouble,
4537 },
4538 );
4539 for v in [1, 1, 2, 2, 2] {
4540 let bytes = encode_avro_int(v);
4541 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4542 }
4543 let arr = dec.flush(None).expect("flush");
4544 let ra = arr
4545 .as_any()
4546 .downcast_ref::<RunArray<Int64Type>>()
4547 .expect("RunArray<Int64Type>");
4548 assert_eq!(ra.len(), 5);
4549 assert_eq!(ra.run_ends().values(), &[2, 5]);
4550 let vals = ra
4551 .values()
4552 .as_ref()
4553 .as_any()
4554 .downcast_ref::<Float64Array>()
4555 .expect("values Float64");
4556 assert_eq!(vals.values(), &[1.0, 2.0]);
4557 }
4558
4559 #[cfg(feature = "avro_custom_types")]
4560 #[test]
4561 fn test_run_end_encoded_unsupported_run_end_width_errors() {
4562 use std::sync::Arc;
4563 let inner = avro_from_codec(Codec::Int32);
4564 let dt = AvroDataType::new(
4565 Codec::RunEndEncoded(Arc::new(inner), 3),
4566 Default::default(),
4567 None,
4568 );
4569 let err = Decoder::try_new(&dt).expect_err("must reject unsupported width");
4570 let msg = err.to_string();
4571 assert!(
4572 msg.contains("Unsupported run-end width")
4573 && msg.contains("16/32/64 bits or 2/4/8 bytes"),
4574 "unexpected error message: {msg}"
4575 );
4576 }
4577
4578 #[cfg(feature = "avro_custom_types")]
4579 #[test]
4580 fn test_run_end_encoded_empty_input_is_empty_runarray() {
4581 use arrow_array::RunArray;
4582 use std::sync::Arc;
4583 let inner = avro_from_codec(Codec::Utf8);
4584 let dt = AvroDataType::new(
4585 Codec::RunEndEncoded(Arc::new(inner), 4),
4586 Default::default(),
4587 None,
4588 );
4589 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4590 let arr = dec.flush(None).expect("flush");
4591 let ra = arr
4592 .as_any()
4593 .downcast_ref::<RunArray<Int32Type>>()
4594 .expect("RunArray<Int32Type>");
4595 assert_eq!(ra.len(), 0);
4596 assert_eq!(ra.run_ends().len(), 0);
4597 assert_eq!(ra.values().len(), 0);
4598 }
4599
4600 #[cfg(feature = "avro_custom_types")]
4601 #[test]
4602 fn test_run_end_encoded_strings_grouping_width32_bits() {
4603 use arrow_array::RunArray;
4604 use std::sync::Arc;
4605 let inner = avro_from_codec(Codec::Utf8);
4606 let dt = AvroDataType::new(
4607 Codec::RunEndEncoded(Arc::new(inner), 32),
4608 Default::default(),
4609 None,
4610 );
4611 let mut dec = Decoder::try_new(&dt).expect("create REE decoder");
4612 for s in ["a", "a", "bb", "bb", "bb", "a"] {
4613 let bytes = encode_avro_bytes(s.as_bytes());
4614 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4615 }
4616 let arr = dec.flush(None).expect("flush");
4617 let ra = arr
4618 .as_any()
4619 .downcast_ref::<RunArray<Int32Type>>()
4620 .expect("RunArray<Int32Type>");
4621 assert_eq!(ra.run_ends().values(), &[2, 5, 6]);
4622 let vals = ra
4623 .values()
4624 .as_ref()
4625 .as_any()
4626 .downcast_ref::<StringArray>()
4627 .expect("values String");
4628 assert_eq!(vals.len(), 3);
4629 assert_eq!(vals.value(0), "a");
4630 assert_eq!(vals.value(1), "bb");
4631 assert_eq!(vals.value(2), "a");
4632 }
4633
4634 #[cfg(not(feature = "avro_custom_types"))]
4635 #[test]
4636 fn test_no_custom_types_feature_smoke_decodes_plain_int32() {
4637 let dt = avro_from_codec(Codec::Int32);
4638 let mut dec = Decoder::try_new(&dt).expect("create Int32 decoder");
4639 for v in [1, 2, 3] {
4640 let bytes = encode_avro_int(v);
4641 dec.decode(&mut AvroCursor::new(&bytes)).expect("decode");
4642 }
4643 let arr = dec.flush(None).expect("flush");
4644 let a = arr
4645 .as_any()
4646 .downcast_ref::<Int32Array>()
4647 .expect("Int32Array");
4648 assert_eq!(a.values(), &[1, 2, 3]);
4649 }
4650}