1use crate::codec::{AvroDataType, Codec, Promotion, ResolutionInfo};
19use crate::reader::block::{Block, BlockDecoder};
20use crate::reader::cursor::AvroCursor;
21use crate::reader::header::Header;
22use crate::schema::*;
23use arrow_array::builder::{
24 ArrayBuilder, Decimal128Builder, Decimal256Builder, Decimal32Builder, Decimal64Builder,
25 IntervalMonthDayNanoBuilder, PrimitiveBuilder,
26};
27use arrow_array::types::*;
28use arrow_array::*;
29use arrow_buffer::*;
30use arrow_schema::{
31 ArrowError, DataType, Field as ArrowField, FieldRef, Fields, IntervalUnit,
32 Schema as ArrowSchema, SchemaRef, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
33};
34#[cfg(feature = "small_decimals")]
35use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
36use std::cmp::Ordering;
37use std::collections::HashMap;
38use std::io::Read;
39use std::sync::Arc;
40use uuid::Uuid;
41
42const DEFAULT_CAPACITY: usize = 1024;
43
44macro_rules! decode_decimal {
46 ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
47 let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?;
48 $builder.append_value(<$Int>::from_be_bytes(bytes));
49 }};
50}
51
52macro_rules! flush_decimal {
54 ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{
55 let (_, vals, _) = $builder.finish().into_parts();
56 let dec = <$ArrayTy>::new(vals, $nulls)
57 .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8)
58 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
59 Arc::new(dec) as ArrayRef
60 }};
61}
62
63#[derive(Debug)]
64pub(crate) struct RecordDecoderBuilder<'a> {
65 data_type: &'a AvroDataType,
66 use_utf8view: bool,
67}
68
69impl<'a> RecordDecoderBuilder<'a> {
70 pub(crate) fn new(data_type: &'a AvroDataType) -> Self {
71 Self {
72 data_type,
73 use_utf8view: false,
74 }
75 }
76
77 pub(crate) fn with_utf8_view(mut self, use_utf8view: bool) -> Self {
78 self.use_utf8view = use_utf8view;
79 self
80 }
81
82 pub(crate) fn build(self) -> Result<RecordDecoder, ArrowError> {
84 RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view)
85 }
86}
87
88#[derive(Debug)]
90pub(crate) struct RecordDecoder {
91 schema: SchemaRef,
92 fields: Vec<Decoder>,
93 use_utf8view: bool,
94 resolved: Option<ResolvedRuntime>,
95}
96
97#[derive(Debug)]
98struct ResolvedRuntime {
99 writer_to_reader: Arc<[Option<usize>]>,
101 skip_decoders: Vec<Option<Skipper>>,
103}
104
105impl RecordDecoder {
106 pub(crate) fn new(data_type: &'_ AvroDataType) -> Self {
108 RecordDecoderBuilder::new(data_type).build().unwrap()
109 }
110
111 pub(crate) fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
113 RecordDecoderBuilder::new(data_type)
114 .with_utf8_view(true)
115 .build()
116 }
117
118 pub(crate) fn try_new_with_options(
129 data_type: &AvroDataType,
130 use_utf8view: bool,
131 ) -> Result<Self, ArrowError> {
132 match data_type.codec() {
133 Codec::Struct(reader_fields) => {
134 let mut arrow_fields = Vec::with_capacity(reader_fields.len());
136 let mut encodings = Vec::with_capacity(reader_fields.len());
137 for avro_field in reader_fields.iter() {
138 arrow_fields.push(avro_field.field());
139 encodings.push(Decoder::try_new(avro_field.data_type())?);
140 }
141 let resolved = match data_type.resolution.as_ref() {
143 Some(ResolutionInfo::Record(rec)) => {
144 let skip_decoders = build_skip_decoders(&rec.skip_fields)?;
145 Some(ResolvedRuntime {
146 writer_to_reader: rec.writer_to_reader.clone(),
147 skip_decoders,
148 })
149 }
150 _ => None,
151 };
152 Ok(Self {
153 schema: Arc::new(ArrowSchema::new(arrow_fields)),
154 fields: encodings,
155 use_utf8view,
156 resolved,
157 })
158 }
159 other => Err(ArrowError::ParseError(format!(
160 "Expected record got {other:?}"
161 ))),
162 }
163 }
164
165 pub(crate) fn schema(&self) -> &SchemaRef {
167 &self.schema
168 }
169
170 pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize, ArrowError> {
172 let mut cursor = AvroCursor::new(buf);
173 match self.resolved.as_mut() {
174 Some(runtime) => {
175 for _ in 0..count {
178 decode_with_resolution(
179 &mut cursor,
180 &mut self.fields,
181 &runtime.writer_to_reader,
182 &mut runtime.skip_decoders,
183 )?;
184 }
185 }
186 None => {
187 for _ in 0..count {
188 for field in &mut self.fields {
189 field.decode(&mut cursor)?;
190 }
191 }
192 }
193 }
194 Ok(cursor.position())
195 }
196
197 pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
199 let arrays = self
200 .fields
201 .iter_mut()
202 .map(|x| x.flush(None))
203 .collect::<Result<Vec<_>, _>>()?;
204 RecordBatch::try_new(self.schema.clone(), arrays)
205 }
206}
207
208fn decode_with_resolution(
209 buf: &mut AvroCursor<'_>,
210 encodings: &mut [Decoder],
211 writer_to_reader: &[Option<usize>],
212 skippers: &mut [Option<Skipper>],
213) -> Result<(), ArrowError> {
214 for (w_idx, (target, skipper_opt)) in writer_to_reader.iter().zip(skippers).enumerate() {
215 match (*target, skipper_opt.as_mut()) {
216 (Some(r_idx), _) => encodings[r_idx].decode(buf)?,
217 (None, Some(sk)) => sk.skip(buf)?,
218 (None, None) => {
219 return Err(ArrowError::SchemaError(format!(
220 "No skipper available for writer-only field at index {w_idx}",
221 )));
222 }
223 }
224 }
225 Ok(())
226}
227
228#[derive(Debug)]
229enum Decoder {
230 Null(usize),
231 Boolean(BooleanBufferBuilder),
232 Int32(Vec<i32>),
233 Int64(Vec<i64>),
234 Float32(Vec<f32>),
235 Float64(Vec<f64>),
236 Date32(Vec<i32>),
237 TimeMillis(Vec<i32>),
238 TimeMicros(Vec<i64>),
239 TimestampMillis(bool, Vec<i64>),
240 TimestampMicros(bool, Vec<i64>),
241 Int32ToInt64(Vec<i64>),
242 Int32ToFloat32(Vec<f32>),
243 Int32ToFloat64(Vec<f64>),
244 Int64ToFloat32(Vec<f32>),
245 Int64ToFloat64(Vec<f64>),
246 Float32ToFloat64(Vec<f64>),
247 BytesToString(OffsetBufferBuilder<i32>, Vec<u8>),
248 StringToBytes(OffsetBufferBuilder<i32>, Vec<u8>),
249 Binary(OffsetBufferBuilder<i32>, Vec<u8>),
250 String(OffsetBufferBuilder<i32>, Vec<u8>),
252 StringView(OffsetBufferBuilder<i32>, Vec<u8>),
254 Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
255 Record(Fields, Vec<Decoder>),
256 Map(
257 FieldRef,
258 OffsetBufferBuilder<i32>,
259 OffsetBufferBuilder<i32>,
260 Vec<u8>,
261 Box<Decoder>,
262 ),
263 Fixed(i32, Vec<u8>),
264 Enum(Vec<i32>, Arc<[String]>),
265 Duration(IntervalMonthDayNanoBuilder),
266 Uuid(Vec<u8>),
267 Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
268 Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
269 Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
270 Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
271 Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
272 EnumResolved {
273 indices: Vec<i32>,
274 symbols: Arc<[String]>,
275 mapping: Arc<[i32]>,
276 default_index: i32,
277 },
278 RecordResolved {
280 fields: Fields,
281 encodings: Vec<Decoder>,
282 writer_to_reader: Arc<[Option<usize>]>,
283 skip_decoders: Vec<Option<Skipper>>,
284 },
285}
286
287impl Decoder {
288 fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
289 let promotion = match data_type.resolution.as_ref() {
291 Some(ResolutionInfo::Promotion(p)) => Some(p),
292 _ => None,
293 };
294 let decoder = match (data_type.codec(), promotion) {
295 (Codec::Int64, Some(Promotion::IntToLong)) => {
296 Self::Int32ToInt64(Vec::with_capacity(DEFAULT_CAPACITY))
297 }
298 (Codec::Float32, Some(Promotion::IntToFloat)) => {
299 Self::Int32ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
300 }
301 (Codec::Float64, Some(Promotion::IntToDouble)) => {
302 Self::Int32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
303 }
304 (Codec::Float32, Some(Promotion::LongToFloat)) => {
305 Self::Int64ToFloat32(Vec::with_capacity(DEFAULT_CAPACITY))
306 }
307 (Codec::Float64, Some(Promotion::LongToDouble)) => {
308 Self::Int64ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
309 }
310 (Codec::Float64, Some(Promotion::FloatToDouble)) => {
311 Self::Float32ToFloat64(Vec::with_capacity(DEFAULT_CAPACITY))
312 }
313 (Codec::Utf8, Some(Promotion::BytesToString))
314 | (Codec::Utf8View, Some(Promotion::BytesToString)) => Self::BytesToString(
315 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
316 Vec::with_capacity(DEFAULT_CAPACITY),
317 ),
318 (Codec::Binary, Some(Promotion::StringToBytes)) => Self::StringToBytes(
319 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
320 Vec::with_capacity(DEFAULT_CAPACITY),
321 ),
322 (Codec::Null, _) => Self::Null(0),
323 (Codec::Boolean, _) => Self::Boolean(BooleanBufferBuilder::new(DEFAULT_CAPACITY)),
324 (Codec::Int32, _) => Self::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
325 (Codec::Int64, _) => Self::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
326 (Codec::Float32, _) => Self::Float32(Vec::with_capacity(DEFAULT_CAPACITY)),
327 (Codec::Float64, _) => Self::Float64(Vec::with_capacity(DEFAULT_CAPACITY)),
328 (Codec::Binary, _) => Self::Binary(
329 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
330 Vec::with_capacity(DEFAULT_CAPACITY),
331 ),
332 (Codec::Utf8, _) => Self::String(
333 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
334 Vec::with_capacity(DEFAULT_CAPACITY),
335 ),
336 (Codec::Utf8View, _) => Self::StringView(
337 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
338 Vec::with_capacity(DEFAULT_CAPACITY),
339 ),
340 (Codec::Date32, _) => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)),
341 (Codec::TimeMillis, _) => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)),
342 (Codec::TimeMicros, _) => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)),
343 (Codec::TimestampMillis(is_utc), _) => {
344 Self::TimestampMillis(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
345 }
346 (Codec::TimestampMicros(is_utc), _) => {
347 Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
348 }
349 (Codec::Fixed(sz), _) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
350 (Codec::Decimal(precision, scale, size), _) => {
351 let p = *precision;
352 let s = *scale;
353 let prec = p as u8;
354 let scl = s.unwrap_or(0) as i8;
355 #[cfg(feature = "small_decimals")]
356 {
357 if p <= DECIMAL32_MAX_PRECISION as usize {
358 let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY)
359 .with_precision_and_scale(prec, scl)?;
360 Self::Decimal32(p, s, *size, builder)
361 } else if p <= DECIMAL64_MAX_PRECISION as usize {
362 let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY)
363 .with_precision_and_scale(prec, scl)?;
364 Self::Decimal64(p, s, *size, builder)
365 } else if p <= DECIMAL128_MAX_PRECISION as usize {
366 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
367 .with_precision_and_scale(prec, scl)?;
368 Self::Decimal128(p, s, *size, builder)
369 } else if p <= DECIMAL256_MAX_PRECISION as usize {
370 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
371 .with_precision_and_scale(prec, scl)?;
372 Self::Decimal256(p, s, *size, builder)
373 } else {
374 return Err(ArrowError::ParseError(format!(
375 "Decimal precision {p} exceeds maximum supported"
376 )));
377 }
378 }
379 #[cfg(not(feature = "small_decimals"))]
380 {
381 if p <= DECIMAL128_MAX_PRECISION as usize {
382 let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY)
383 .with_precision_and_scale(prec, scl)?;
384 Self::Decimal128(p, s, *size, builder)
385 } else if p <= DECIMAL256_MAX_PRECISION as usize {
386 let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY)
387 .with_precision_and_scale(prec, scl)?;
388 Self::Decimal256(p, s, *size, builder)
389 } else {
390 return Err(ArrowError::ParseError(format!(
391 "Decimal precision {p} exceeds maximum supported"
392 )));
393 }
394 }
395 }
396 (Codec::Interval, _) => Self::Duration(IntervalMonthDayNanoBuilder::new()),
397 (Codec::List(item), _) => {
398 let decoder = Self::try_new(item)?;
399 Self::Array(
400 Arc::new(item.field_with_name("item")),
401 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
402 Box::new(decoder),
403 )
404 }
405 (Codec::Enum(symbols), _) => {
406 if let Some(ResolutionInfo::EnumMapping(mapping)) = data_type.resolution.as_ref() {
407 Self::EnumResolved {
408 indices: Vec::with_capacity(DEFAULT_CAPACITY),
409 symbols: symbols.clone(),
410 mapping: mapping.mapping.clone(),
411 default_index: mapping.default_index,
412 }
413 } else {
414 Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY), symbols.clone())
415 }
416 }
417 (Codec::Struct(fields), _) => {
418 let mut arrow_fields = Vec::with_capacity(fields.len());
419 let mut encodings = Vec::with_capacity(fields.len());
420 for avro_field in fields.iter() {
421 let encoding = Self::try_new(avro_field.data_type())?;
422 arrow_fields.push(avro_field.field());
423 encodings.push(encoding);
424 }
425 if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() {
426 let skip_decoders = build_skip_decoders(&rec.skip_fields)?;
427 Self::RecordResolved {
428 fields: arrow_fields.into(),
429 encodings,
430 writer_to_reader: rec.writer_to_reader.clone(),
431 skip_decoders,
432 }
433 } else {
434 Self::Record(arrow_fields.into(), encodings)
435 }
436 }
437 (Codec::Map(child), _) => {
438 let val_field = child.field_with_name("value");
439 let map_field = Arc::new(ArrowField::new(
440 "entries",
441 DataType::Struct(Fields::from(vec![
442 ArrowField::new("key", DataType::Utf8, false),
443 val_field,
444 ])),
445 false,
446 ));
447 let val_dec = Self::try_new(child)?;
448 Self::Map(
449 map_field,
450 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
451 OffsetBufferBuilder::new(DEFAULT_CAPACITY),
452 Vec::with_capacity(DEFAULT_CAPACITY),
453 Box::new(val_dec),
454 )
455 }
456 (Codec::Uuid, _) => Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
457 };
458 Ok(match data_type.nullability() {
459 Some(nullability) => Self::Nullable(
460 nullability,
461 NullBufferBuilder::new(DEFAULT_CAPACITY),
462 Box::new(decoder),
463 ),
464 None => decoder,
465 })
466 }
467
468 fn append_null(&mut self) {
470 match self {
471 Self::Null(count) => *count += 1,
472 Self::Boolean(b) => b.append(false),
473 Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => v.push(0),
474 Self::Int64(v)
475 | Self::Int32ToInt64(v)
476 | Self::TimeMicros(v)
477 | Self::TimestampMillis(_, v)
478 | Self::TimestampMicros(_, v) => v.push(0),
479 Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
480 Self::Float64(v)
481 | Self::Int32ToFloat64(v)
482 | Self::Int64ToFloat64(v)
483 | Self::Float32ToFloat64(v) => v.push(0.),
484 Self::Binary(offsets, _)
485 | Self::String(offsets, _)
486 | Self::StringView(offsets, _)
487 | Self::BytesToString(offsets, _)
488 | Self::StringToBytes(offsets, _) => {
489 offsets.push_length(0);
490 }
491 Self::Uuid(v) => {
492 v.extend([0; 16]);
493 }
494 Self::Array(_, offsets, e) => {
495 offsets.push_length(0);
496 }
497 Self::Record(_, e) => e.iter_mut().for_each(|e| e.append_null()),
498 Self::Map(_, _koff, moff, _, _) => {
499 moff.push_length(0);
500 }
501 Self::Fixed(sz, accum) => {
502 accum.extend(std::iter::repeat_n(0u8, *sz as usize));
503 }
504 Self::Decimal32(_, _, _, builder) => builder.append_value(0),
505 Self::Decimal64(_, _, _, builder) => builder.append_value(0),
506 Self::Decimal128(_, _, _, builder) => builder.append_value(0),
507 Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO),
508 Self::Enum(indices, _) => indices.push(0),
509 Self::EnumResolved { indices, .. } => indices.push(0),
510 Self::Duration(builder) => builder.append_null(),
511 Self::Nullable(_, null_buffer, inner) => {
512 null_buffer.append(false);
513 inner.append_null();
514 }
515 Self::RecordResolved { encodings, .. } => {
516 encodings.iter_mut().for_each(|e| e.append_null());
517 }
518 }
519 }
520
521 fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
523 match self {
524 Self::Null(x) => *x += 1,
525 Self::Boolean(values) => values.append(buf.get_bool()?),
526 Self::Int32(values) | Self::Date32(values) | Self::TimeMillis(values) => {
527 values.push(buf.get_int()?)
528 }
529 Self::Int64(values)
530 | Self::TimeMicros(values)
531 | Self::TimestampMillis(_, values)
532 | Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
533 Self::Float32(values) => values.push(buf.get_float()?),
534 Self::Float64(values) => values.push(buf.get_double()?),
535 Self::Int32ToInt64(values) => values.push(buf.get_int()? as i64),
536 Self::Int32ToFloat32(values) => values.push(buf.get_int()? as f32),
537 Self::Int32ToFloat64(values) => values.push(buf.get_int()? as f64),
538 Self::Int64ToFloat32(values) => values.push(buf.get_long()? as f32),
539 Self::Int64ToFloat64(values) => values.push(buf.get_long()? as f64),
540 Self::Float32ToFloat64(values) => values.push(buf.get_float()? as f64),
541 Self::StringToBytes(offsets, values)
542 | Self::BytesToString(offsets, values)
543 | Self::Binary(offsets, values)
544 | Self::String(offsets, values)
545 | Self::StringView(offsets, values) => {
546 let data = buf.get_bytes()?;
547 offsets.push_length(data.len());
548 values.extend_from_slice(data);
549 }
550 Self::Uuid(values) => {
551 let s_bytes = buf.get_bytes()?;
552 let s = std::str::from_utf8(s_bytes).map_err(|e| {
553 ArrowError::ParseError(format!("UUID bytes are not valid UTF-8: {e}"))
554 })?;
555 let uuid = Uuid::try_parse(s)
556 .map_err(|e| ArrowError::ParseError(format!("Failed to parse uuid: {e}")))?;
557 values.extend_from_slice(uuid.as_bytes());
558 }
559 Self::Array(_, off, encoding) => {
560 let total_items = read_blocks(buf, |cursor| encoding.decode(cursor))?;
561 off.push_length(total_items);
562 }
563 Self::Record(_, encodings) => {
564 for encoding in encodings {
565 encoding.decode(buf)?;
566 }
567 }
568 Self::Map(_, koff, moff, kdata, valdec) => {
569 let newly_added = read_blocks(buf, |cur| {
570 let kb = cur.get_bytes()?;
571 koff.push_length(kb.len());
572 kdata.extend_from_slice(kb);
573 valdec.decode(cur)
574 })?;
575 moff.push_length(newly_added);
576 }
577 Self::Fixed(sz, accum) => {
578 let fx = buf.get_fixed(*sz as usize)?;
579 accum.extend_from_slice(fx);
580 }
581 Self::Decimal32(_, _, size, builder) => {
582 decode_decimal!(size, buf, builder, 4, i32);
583 }
584 Self::Decimal64(_, _, size, builder) => {
585 decode_decimal!(size, buf, builder, 8, i64);
586 }
587 Self::Decimal128(_, _, size, builder) => {
588 decode_decimal!(size, buf, builder, 16, i128);
589 }
590 Self::Decimal256(_, _, size, builder) => {
591 decode_decimal!(size, buf, builder, 32, i256);
592 }
593 Self::Enum(indices, _) => {
594 indices.push(buf.get_int()?);
595 }
596 Self::EnumResolved {
597 indices,
598 mapping,
599 default_index,
600 ..
601 } => {
602 let raw = buf.get_int()?;
603 let resolved = usize::try_from(raw)
604 .ok()
605 .and_then(|idx| mapping.get(idx).copied())
606 .filter(|&idx| idx >= 0)
607 .unwrap_or(*default_index);
608 if resolved >= 0 {
609 indices.push(resolved);
610 } else {
611 return Err(ArrowError::ParseError(format!(
612 "Enum symbol index {raw} not resolvable and no default provided",
613 )));
614 }
615 }
616 Self::Duration(builder) => {
617 let b = buf.get_fixed(12)?;
618 let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
619 let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
620 let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
621 let nanos = (millis as i64) * 1_000_000;
622 builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
623 }
624 Self::Nullable(order, nb, encoding) => {
625 let branch = buf.read_vlq()?;
626 let is_not_null = match *order {
627 Nullability::NullFirst => branch != 0,
628 Nullability::NullSecond => branch == 0,
629 };
630 if is_not_null {
631 encoding.decode(buf)?;
633 } else {
634 encoding.append_null();
635 }
636 nb.append(is_not_null);
637 }
638 Self::RecordResolved {
639 encodings,
640 writer_to_reader,
641 skip_decoders,
642 ..
643 } => {
644 decode_with_resolution(buf, encodings, writer_to_reader, skip_decoders)?;
645 }
646 }
647 Ok(())
648 }
649
650 fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
652 Ok(match self {
653 Self::Nullable(_, n, e) => e.flush(n.finish())?,
654 Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
655 Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
656 Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
657 Self::Date32(values) => Arc::new(flush_primitive::<Date32Type>(values, nulls)),
658 Self::Int64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
659 Self::TimeMillis(values) => {
660 Arc::new(flush_primitive::<Time32MillisecondType>(values, nulls))
661 }
662 Self::TimeMicros(values) => {
663 Arc::new(flush_primitive::<Time64MicrosecondType>(values, nulls))
664 }
665 Self::TimestampMillis(is_utc, values) => Arc::new(
666 flush_primitive::<TimestampMillisecondType>(values, nulls)
667 .with_timezone_opt(is_utc.then(|| "+00:00")),
668 ),
669 Self::TimestampMicros(is_utc, values) => Arc::new(
670 flush_primitive::<TimestampMicrosecondType>(values, nulls)
671 .with_timezone_opt(is_utc.then(|| "+00:00")),
672 ),
673 Self::Float32(values) => Arc::new(flush_primitive::<Float32Type>(values, nulls)),
674 Self::Float64(values) => Arc::new(flush_primitive::<Float64Type>(values, nulls)),
675 Self::Int32ToInt64(values) => Arc::new(flush_primitive::<Int64Type>(values, nulls)),
676 Self::Int32ToFloat32(values) | Self::Int64ToFloat32(values) => {
677 Arc::new(flush_primitive::<Float32Type>(values, nulls))
678 }
679 Self::Int32ToFloat64(values)
680 | Self::Int64ToFloat64(values)
681 | Self::Float32ToFloat64(values) => {
682 Arc::new(flush_primitive::<Float64Type>(values, nulls))
683 }
684 Self::StringToBytes(offsets, values) | Self::Binary(offsets, values) => {
685 let offsets = flush_offsets(offsets);
686 let values = flush_values(values).into();
687 Arc::new(BinaryArray::new(offsets, values, nulls))
688 }
689 Self::BytesToString(offsets, values) | Self::String(offsets, values) => {
690 let offsets = flush_offsets(offsets);
691 let values = flush_values(values).into();
692 Arc::new(StringArray::new(offsets, values, nulls))
693 }
694 Self::StringView(offsets, values) => {
695 let offsets = flush_offsets(offsets);
696 let values = flush_values(values);
697 let array = StringArray::new(offsets, values.into(), nulls.clone());
698 let values: Vec<&str> = (0..array.len())
699 .map(|i| {
700 if array.is_valid(i) {
701 array.value(i)
702 } else {
703 ""
704 }
705 })
706 .collect();
707 Arc::new(StringViewArray::from(values))
708 }
709 Self::Array(field, offsets, values) => {
710 let values = values.flush(None)?;
711 let offsets = flush_offsets(offsets);
712 Arc::new(ListArray::new(field.clone(), offsets, values, nulls))
713 }
714 Self::Record(fields, encodings) => {
715 let arrays = encodings
716 .iter_mut()
717 .map(|x| x.flush(None))
718 .collect::<Result<Vec<_>, _>>()?;
719 Arc::new(StructArray::new(fields.clone(), arrays, nulls))
720 }
721 Self::Map(map_field, k_off, m_off, kdata, valdec) => {
722 let moff = flush_offsets(m_off);
723 let koff = flush_offsets(k_off);
724 let kd = flush_values(kdata).into();
725 let val_arr = valdec.flush(None)?;
726 let key_arr = StringArray::new(koff, kd, None);
727 if key_arr.len() != val_arr.len() {
728 return Err(ArrowError::InvalidArgumentError(format!(
729 "Map keys length ({}) != map values length ({})",
730 key_arr.len(),
731 val_arr.len()
732 )));
733 }
734 let final_len = moff.len() - 1;
735 if let Some(n) = &nulls {
736 if n.len() != final_len {
737 return Err(ArrowError::InvalidArgumentError(format!(
738 "Map array null buffer length {} != final map length {final_len}",
739 n.len()
740 )));
741 }
742 }
743 let entries_fields = match map_field.data_type() {
744 DataType::Struct(fields) => fields.clone(),
745 other => {
746 return Err(ArrowError::InvalidArgumentError(format!(
747 "Map entries field must be a Struct, got {other:?}"
748 )))
749 }
750 };
751 let entries_struct =
752 StructArray::new(entries_fields, vec![Arc::new(key_arr), val_arr], None);
753 let map_arr = MapArray::new(map_field.clone(), moff, entries_struct, nulls, false);
754 Arc::new(map_arr)
755 }
756 Self::Fixed(sz, accum) => {
757 let b: Buffer = flush_values(accum).into();
758 let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
759 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
760 Arc::new(arr)
761 }
762 Self::Uuid(values) => {
763 let arr = FixedSizeBinaryArray::try_new(16, std::mem::take(values).into(), nulls)
764 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
765 Arc::new(arr)
766 }
767 Self::Decimal32(precision, scale, _, builder) => {
768 flush_decimal!(builder, precision, scale, nulls, Decimal32Array)
769 }
770 Self::Decimal64(precision, scale, _, builder) => {
771 flush_decimal!(builder, precision, scale, nulls, Decimal64Array)
772 }
773 Self::Decimal128(precision, scale, _, builder) => {
774 flush_decimal!(builder, precision, scale, nulls, Decimal128Array)
775 }
776 Self::Decimal256(precision, scale, _, builder) => {
777 flush_decimal!(builder, precision, scale, nulls, Decimal256Array)
778 }
779 Self::Enum(indices, symbols) => flush_dict(indices, symbols, nulls)?,
780 Self::EnumResolved {
781 indices, symbols, ..
782 } => flush_dict(indices, symbols, nulls)?,
783 Self::Duration(builder) => {
784 let (_, vals, _) = builder.finish().into_parts();
785 let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
786 .map_err(|e| ArrowError::ParseError(e.to_string()))?;
787 Arc::new(vals)
788 }
789 Self::RecordResolved {
790 fields, encodings, ..
791 } => {
792 let arrays = encodings
793 .iter_mut()
794 .map(|x| x.flush(None))
795 .collect::<Result<Vec<_>, _>>()?;
796 Arc::new(StructArray::new(fields.clone(), arrays, nulls))
797 }
798 })
799 }
800}
801
802#[derive(Debug, Copy, Clone)]
803enum NegativeBlockBehavior {
804 ProcessItems,
805 SkipBySize,
806}
807
808#[inline]
809fn skip_blocks(
810 buf: &mut AvroCursor,
811 mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
812) -> Result<usize, ArrowError> {
813 process_blockwise(
814 buf,
815 move |c| skip_item(c),
816 NegativeBlockBehavior::SkipBySize,
817 )
818}
819
820#[inline]
821fn flush_dict(
822 indices: &mut Vec<i32>,
823 symbols: &[String],
824 nulls: Option<NullBuffer>,
825) -> Result<ArrayRef, ArrowError> {
826 let keys = flush_primitive::<Int32Type>(indices, nulls);
827 let values = Arc::new(StringArray::from_iter_values(
828 symbols.iter().map(|s| s.as_str()),
829 ));
830 DictionaryArray::try_new(keys, values)
831 .map_err(|e| ArrowError::ParseError(e.to_string()))
832 .map(|arr| Arc::new(arr) as ArrayRef)
833}
834
835#[inline]
836fn read_blocks(
837 buf: &mut AvroCursor,
838 decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
839) -> Result<usize, ArrowError> {
840 process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
841}
842
843#[inline]
844fn process_blockwise(
845 buf: &mut AvroCursor,
846 mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
847 negative_behavior: NegativeBlockBehavior,
848) -> Result<usize, ArrowError> {
849 let mut total = 0usize;
850 loop {
851 let block_count = buf.get_long()?;
856 match block_count.cmp(&0) {
857 Ordering::Equal => break,
858 Ordering::Less => {
859 let count = (-block_count) as usize;
860 let size_in_bytes = buf.get_long()? as usize;
862 match negative_behavior {
863 NegativeBlockBehavior::ProcessItems => {
864 for _ in 0..count {
866 on_item(buf)?;
867 }
868 }
869 NegativeBlockBehavior::SkipBySize => {
870 let _ = buf.get_fixed(size_in_bytes)?;
872 }
873 }
874 total += count;
875 }
876 Ordering::Greater => {
877 let count = block_count as usize;
878 for _ in 0..count {
879 on_item(buf)?;
880 }
881 total += count;
882 }
883 }
884 }
885 Ok(total)
886}
887
888#[inline]
889fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
890 std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
891}
892
893#[inline]
894fn flush_offsets(offsets: &mut OffsetBufferBuilder<i32>) -> OffsetBuffer<i32> {
895 std::mem::replace(offsets, OffsetBufferBuilder::new(DEFAULT_CAPACITY)).finish()
896}
897
898#[inline]
899fn flush_primitive<T: ArrowPrimitiveType>(
900 values: &mut Vec<T::Native>,
901 nulls: Option<NullBuffer>,
902) -> PrimitiveArray<T> {
903 PrimitiveArray::new(flush_values(values).into(), nulls)
904}
905
906#[inline]
907fn read_decimal_bytes_be<const N: usize>(
908 buf: &mut AvroCursor<'_>,
909 size: &Option<usize>,
910) -> Result<[u8; N], ArrowError> {
911 match size {
912 Some(n) if *n == N => {
913 let raw = buf.get_fixed(N)?;
914 let mut arr = [0u8; N];
915 arr.copy_from_slice(raw);
916 Ok(arr)
917 }
918 Some(n) => {
919 let raw = buf.get_fixed(*n)?;
920 sign_cast_to::<N>(raw)
921 }
922 None => {
923 let raw = buf.get_bytes()?;
924 sign_cast_to::<N>(raw)
925 }
926 }
927}
928
929#[inline]
938fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
939 let len = raw.len();
940 if len == N {
942 let mut out = [0u8; N];
943 out.copy_from_slice(raw);
944 return Ok(out);
945 }
946 let first = raw.first().copied().unwrap_or(0u8);
948 let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF };
949 let mut out = [sign_byte; N];
951 if len > N {
952 let extra = len - N;
955 if raw[..extra].iter().any(|&b| b != sign_byte) {
957 return Err(ArrowError::ParseError(format!(
958 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
959 len, N
960 )));
961 }
962 if N > 0 {
963 let first_kept = raw[extra];
964 let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
965 if sign_bit_mismatch {
966 return Err(ArrowError::ParseError(format!(
967 "Decimal value with {} bytes cannot be represented in {} bytes without overflow",
968 len, N
969 )));
970 }
971 }
972 out.copy_from_slice(&raw[extra..]);
973 return Ok(out);
974 }
975 out[N - len..].copy_from_slice(raw);
976 Ok(out)
977}
978
979#[derive(Debug)]
985enum Skipper {
986 Null,
987 Boolean,
988 Int32,
989 Int64,
990 Float32,
991 Float64,
992 Bytes,
993 String,
994 Date32,
995 TimeMillis,
996 TimeMicros,
997 TimestampMillis,
998 TimestampMicros,
999 Fixed(usize),
1000 Decimal(Option<usize>),
1001 UuidString,
1002 Enum,
1003 DurationFixed12,
1004 List(Box<Skipper>),
1005 Map(Box<Skipper>),
1006 Struct(Vec<Skipper>),
1007 Nullable(Nullability, Box<Skipper>),
1008}
1009
1010impl Skipper {
1011 fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
1012 let mut base = match dt.codec() {
1013 Codec::Null => Self::Null,
1014 Codec::Boolean => Self::Boolean,
1015 Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32,
1016 Codec::Int64 => Self::Int64,
1017 Codec::TimeMicros => Self::TimeMicros,
1018 Codec::TimestampMillis(_) => Self::TimestampMillis,
1019 Codec::TimestampMicros(_) => Self::TimestampMicros,
1020 Codec::Float32 => Self::Float32,
1021 Codec::Float64 => Self::Float64,
1022 Codec::Binary => Self::Bytes,
1023 Codec::Utf8 | Codec::Utf8View => Self::String,
1024 Codec::Fixed(sz) => Self::Fixed(*sz as usize),
1025 Codec::Decimal(_, _, size) => Self::Decimal(*size),
1026 Codec::Uuid => Self::UuidString, Codec::Enum(_) => Self::Enum,
1028 Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)),
1029 Codec::Struct(fields) => Self::Struct(
1030 fields
1031 .iter()
1032 .map(|f| Skipper::from_avro(f.data_type()))
1033 .collect::<Result<_, _>>()?,
1034 ),
1035 Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)),
1036 Codec::Interval => Self::DurationFixed12,
1037 _ => {
1038 return Err(ArrowError::NotYetImplemented(format!(
1039 "Skipper not implemented for codec {:?}",
1040 dt.codec()
1041 )));
1042 }
1043 };
1044 if let Some(n) = dt.nullability() {
1045 base = Self::Nullable(n, Box::new(base));
1046 }
1047 Ok(base)
1048 }
1049
1050 fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
1051 match self {
1052 Self::Null => Ok(()),
1053 Self::Boolean => {
1054 buf.get_bool()?;
1055 Ok(())
1056 }
1057 Self::Int32 | Self::Date32 | Self::TimeMillis => {
1058 buf.get_int()?;
1059 Ok(())
1060 }
1061 Self::Int64 | Self::TimeMicros | Self::TimestampMillis | Self::TimestampMicros => {
1062 buf.get_long()?;
1063 Ok(())
1064 }
1065 Self::Float32 => {
1066 buf.get_float()?;
1067 Ok(())
1068 }
1069 Self::Float64 => {
1070 buf.get_double()?;
1071 Ok(())
1072 }
1073 Self::Bytes | Self::String | Self::UuidString => {
1074 buf.get_bytes()?;
1075 Ok(())
1076 }
1077 Self::Fixed(sz) => {
1078 buf.get_fixed(*sz)?;
1079 Ok(())
1080 }
1081 Self::Decimal(size) => {
1082 if let Some(s) = size {
1083 buf.get_fixed(*s)
1084 } else {
1085 buf.get_bytes()
1086 }?;
1087 Ok(())
1088 }
1089 Self::Enum => {
1090 buf.get_int()?;
1091 Ok(())
1092 }
1093 Self::DurationFixed12 => {
1094 buf.get_fixed(12)?;
1095 Ok(())
1096 }
1097 Self::List(item) => {
1098 skip_blocks(buf, |c| item.skip(c))?;
1099 Ok(())
1100 }
1101 Self::Map(value) => {
1102 skip_blocks(buf, |c| {
1103 c.get_bytes()?; value.skip(c)
1105 })?;
1106 Ok(())
1107 }
1108 Self::Struct(fields) => {
1109 for f in fields.iter_mut() {
1110 f.skip(buf)?
1111 }
1112 Ok(())
1113 }
1114 Self::Nullable(order, inner) => {
1115 let branch = buf.read_vlq()?;
1116 let is_not_null = match *order {
1117 Nullability::NullFirst => branch != 0,
1118 Nullability::NullSecond => branch == 0,
1119 };
1120 if is_not_null {
1121 inner.skip(buf)?;
1122 }
1123 Ok(())
1124 }
1125 }
1126 }
1127}
1128
1129#[inline]
1130fn build_skip_decoders(
1131 skip_fields: &[Option<AvroDataType>],
1132) -> Result<Vec<Option<Skipper>>, ArrowError> {
1133 skip_fields
1134 .iter()
1135 .map(|opt| opt.as_ref().map(Skipper::from_avro).transpose())
1136 .collect()
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141 use super::*;
1142 use crate::codec::AvroField;
1143 use arrow_array::{
1144 cast::AsArray, Array, Decimal128Array, Decimal256Array, Decimal32Array, DictionaryArray,
1145 FixedSizeBinaryArray, IntervalMonthDayNanoArray, ListArray, MapArray, StringArray,
1146 StructArray,
1147 };
1148
1149 fn encode_avro_int(value: i32) -> Vec<u8> {
1150 let mut buf = Vec::new();
1151 let mut v = (value << 1) ^ (value >> 31);
1152 while v & !0x7F != 0 {
1153 buf.push(((v & 0x7F) | 0x80) as u8);
1154 v >>= 7;
1155 }
1156 buf.push(v as u8);
1157 buf
1158 }
1159
1160 fn encode_avro_long(value: i64) -> Vec<u8> {
1161 let mut buf = Vec::new();
1162 let mut v = (value << 1) ^ (value >> 63);
1163 while v & !0x7F != 0 {
1164 buf.push(((v & 0x7F) | 0x80) as u8);
1165 v >>= 7;
1166 }
1167 buf.push(v as u8);
1168 buf
1169 }
1170
1171 fn encode_avro_bytes(bytes: &[u8]) -> Vec<u8> {
1172 let mut buf = encode_avro_long(bytes.len() as i64);
1173 buf.extend_from_slice(bytes);
1174 buf
1175 }
1176
1177 fn avro_from_codec(codec: Codec) -> AvroDataType {
1178 AvroDataType::new(codec, Default::default(), None)
1179 }
1180
1181 fn decoder_for_promotion(
1182 writer: PrimitiveType,
1183 reader: PrimitiveType,
1184 use_utf8view: bool,
1185 ) -> Decoder {
1186 let ws = Schema::TypeName(TypeName::Primitive(writer));
1187 let rs = Schema::TypeName(TypeName::Primitive(reader));
1188 let field =
1189 AvroField::resolve_from_writer_and_reader(&ws, &rs, use_utf8view, false).unwrap();
1190 Decoder::try_new(field.data_type()).unwrap()
1191 }
1192
1193 #[test]
1194 fn test_schema_resolution_promotion_int_to_long() {
1195 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Long, false);
1196 assert!(matches!(dec, Decoder::Int32ToInt64(_)));
1197 for v in [0, 1, -2, 123456] {
1198 let data = encode_avro_int(v);
1199 let mut cur = AvroCursor::new(&data);
1200 dec.decode(&mut cur).unwrap();
1201 }
1202 let arr = dec.flush(None).unwrap();
1203 let a = arr.as_any().downcast_ref::<Int64Array>().unwrap();
1204 assert_eq!(a.value(0), 0);
1205 assert_eq!(a.value(1), 1);
1206 assert_eq!(a.value(2), -2);
1207 assert_eq!(a.value(3), 123456);
1208 }
1209
1210 #[test]
1211 fn test_schema_resolution_promotion_int_to_float() {
1212 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Float, false);
1213 assert!(matches!(dec, Decoder::Int32ToFloat32(_)));
1214 for v in [0, 42, -7] {
1215 let data = encode_avro_int(v);
1216 let mut cur = AvroCursor::new(&data);
1217 dec.decode(&mut cur).unwrap();
1218 }
1219 let arr = dec.flush(None).unwrap();
1220 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
1221 assert_eq!(a.value(0), 0.0);
1222 assert_eq!(a.value(1), 42.0);
1223 assert_eq!(a.value(2), -7.0);
1224 }
1225
1226 #[test]
1227 fn test_schema_resolution_promotion_int_to_double() {
1228 let mut dec = decoder_for_promotion(PrimitiveType::Int, PrimitiveType::Double, false);
1229 assert!(matches!(dec, Decoder::Int32ToFloat64(_)));
1230 for v in [1, -1, 10_000] {
1231 let data = encode_avro_int(v);
1232 let mut cur = AvroCursor::new(&data);
1233 dec.decode(&mut cur).unwrap();
1234 }
1235 let arr = dec.flush(None).unwrap();
1236 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
1237 assert_eq!(a.value(0), 1.0);
1238 assert_eq!(a.value(1), -1.0);
1239 assert_eq!(a.value(2), 10_000.0);
1240 }
1241
1242 #[test]
1243 fn test_schema_resolution_promotion_long_to_float() {
1244 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Float, false);
1245 assert!(matches!(dec, Decoder::Int64ToFloat32(_)));
1246 for v in [0_i64, 1_000_000_i64, -123_i64] {
1247 let data = encode_avro_long(v);
1248 let mut cur = AvroCursor::new(&data);
1249 dec.decode(&mut cur).unwrap();
1250 }
1251 let arr = dec.flush(None).unwrap();
1252 let a = arr.as_any().downcast_ref::<Float32Array>().unwrap();
1253 assert_eq!(a.value(0), 0.0);
1254 assert_eq!(a.value(1), 1_000_000.0);
1255 assert_eq!(a.value(2), -123.0);
1256 }
1257
1258 #[test]
1259 fn test_schema_resolution_promotion_long_to_double() {
1260 let mut dec = decoder_for_promotion(PrimitiveType::Long, PrimitiveType::Double, false);
1261 assert!(matches!(dec, Decoder::Int64ToFloat64(_)));
1262 for v in [2_i64, -2_i64, 9_223_372_i64] {
1263 let data = encode_avro_long(v);
1264 let mut cur = AvroCursor::new(&data);
1265 dec.decode(&mut cur).unwrap();
1266 }
1267 let arr = dec.flush(None).unwrap();
1268 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
1269 assert_eq!(a.value(0), 2.0);
1270 assert_eq!(a.value(1), -2.0);
1271 assert_eq!(a.value(2), 9_223_372.0);
1272 }
1273
1274 #[test]
1275 fn test_schema_resolution_promotion_float_to_double() {
1276 let mut dec = decoder_for_promotion(PrimitiveType::Float, PrimitiveType::Double, false);
1277 assert!(matches!(dec, Decoder::Float32ToFloat64(_)));
1278 for v in [0.5_f32, -3.25_f32, 1.0e6_f32] {
1279 let data = v.to_le_bytes().to_vec();
1280 let mut cur = AvroCursor::new(&data);
1281 dec.decode(&mut cur).unwrap();
1282 }
1283 let arr = dec.flush(None).unwrap();
1284 let a = arr.as_any().downcast_ref::<Float64Array>().unwrap();
1285 assert_eq!(a.value(0), 0.5_f64);
1286 assert_eq!(a.value(1), -3.25_f64);
1287 assert_eq!(a.value(2), 1.0e6_f64);
1288 }
1289
1290 #[test]
1291 fn test_schema_resolution_promotion_bytes_to_string_utf8() {
1292 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, false);
1293 assert!(matches!(dec, Decoder::BytesToString(_, _)));
1294 for s in ["hello", "world", "héllo"] {
1295 let data = encode_avro_bytes(s.as_bytes());
1296 let mut cur = AvroCursor::new(&data);
1297 dec.decode(&mut cur).unwrap();
1298 }
1299 let arr = dec.flush(None).unwrap();
1300 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
1301 assert_eq!(a.value(0), "hello");
1302 assert_eq!(a.value(1), "world");
1303 assert_eq!(a.value(2), "héllo");
1304 }
1305
1306 #[test]
1307 fn test_schema_resolution_promotion_bytes_to_string_utf8view_enabled() {
1308 let mut dec = decoder_for_promotion(PrimitiveType::Bytes, PrimitiveType::String, true);
1309 assert!(matches!(dec, Decoder::BytesToString(_, _)));
1310 let data = encode_avro_bytes("abc".as_bytes());
1311 let mut cur = AvroCursor::new(&data);
1312 dec.decode(&mut cur).unwrap();
1313 let arr = dec.flush(None).unwrap();
1314 let a = arr.as_any().downcast_ref::<StringArray>().unwrap();
1315 assert_eq!(a.value(0), "abc");
1316 }
1317
1318 #[test]
1319 fn test_schema_resolution_promotion_string_to_bytes() {
1320 let mut dec = decoder_for_promotion(PrimitiveType::String, PrimitiveType::Bytes, false);
1321 assert!(matches!(dec, Decoder::StringToBytes(_, _)));
1322 for s in ["", "abc", "data"] {
1323 let data = encode_avro_bytes(s.as_bytes());
1324 let mut cur = AvroCursor::new(&data);
1325 dec.decode(&mut cur).unwrap();
1326 }
1327 let arr = dec.flush(None).unwrap();
1328 let a = arr.as_any().downcast_ref::<BinaryArray>().unwrap();
1329 assert_eq!(a.value(0), b"");
1330 assert_eq!(a.value(1), b"abc");
1331 assert_eq!(a.value(2), "data".as_bytes());
1332 }
1333
1334 #[test]
1335 fn test_schema_resolution_no_promotion_passthrough_int() {
1336 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
1337 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
1338 let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false).unwrap();
1339 let mut dec = Decoder::try_new(field.data_type()).unwrap();
1340 assert!(matches!(dec, Decoder::Int32(_)));
1341 for v in [7, -9] {
1342 let data = encode_avro_int(v);
1343 let mut cur = AvroCursor::new(&data);
1344 dec.decode(&mut cur).unwrap();
1345 }
1346 let arr = dec.flush(None).unwrap();
1347 let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
1348 assert_eq!(a.value(0), 7);
1349 assert_eq!(a.value(1), -9);
1350 }
1351
1352 #[test]
1353 fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
1354 let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
1355 let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
1356 let res = AvroField::resolve_from_writer_and_reader(&ws, &rs, false, false);
1357 assert!(res.is_err(), "expected error for illegal promotion");
1358 }
1359
1360 #[test]
1361 fn test_map_decoding_one_entry() {
1362 let value_type = avro_from_codec(Codec::Utf8);
1363 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
1364 let mut decoder = Decoder::try_new(&map_type).unwrap();
1365 let mut data = Vec::new();
1367 data.extend_from_slice(&encode_avro_long(1));
1368 data.extend_from_slice(&encode_avro_bytes(b"hello")); data.extend_from_slice(&encode_avro_bytes(b"world")); data.extend_from_slice(&encode_avro_long(0));
1371 let mut cursor = AvroCursor::new(&data);
1372 decoder.decode(&mut cursor).unwrap();
1373 let array = decoder.flush(None).unwrap();
1374 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
1375 assert_eq!(map_arr.len(), 1); assert_eq!(map_arr.value_length(0), 1);
1377 let entries = map_arr.value(0);
1378 let struct_entries = entries.as_any().downcast_ref::<StructArray>().unwrap();
1379 assert_eq!(struct_entries.len(), 1);
1380 let key_arr = struct_entries
1381 .column_by_name("key")
1382 .unwrap()
1383 .as_any()
1384 .downcast_ref::<StringArray>()
1385 .unwrap();
1386 let val_arr = struct_entries
1387 .column_by_name("value")
1388 .unwrap()
1389 .as_any()
1390 .downcast_ref::<StringArray>()
1391 .unwrap();
1392 assert_eq!(key_arr.value(0), "hello");
1393 assert_eq!(val_arr.value(0), "world");
1394 }
1395
1396 #[test]
1397 fn test_map_decoding_empty() {
1398 let value_type = avro_from_codec(Codec::Utf8);
1399 let map_type = avro_from_codec(Codec::Map(Arc::new(value_type)));
1400 let mut decoder = Decoder::try_new(&map_type).unwrap();
1401 let data = encode_avro_long(0);
1402 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
1403 let array = decoder.flush(None).unwrap();
1404 let map_arr = array.as_any().downcast_ref::<MapArray>().unwrap();
1405 assert_eq!(map_arr.len(), 1);
1406 assert_eq!(map_arr.value_length(0), 0);
1407 }
1408
1409 #[test]
1410 fn test_fixed_decoding() {
1411 let avro_type = avro_from_codec(Codec::Fixed(3));
1412 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
1413
1414 let data1 = [1u8, 2, 3];
1415 let mut cursor1 = AvroCursor::new(&data1);
1416 decoder
1417 .decode(&mut cursor1)
1418 .expect("Failed to decode data1");
1419 assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
1420 let data2 = [4u8, 5, 6];
1421 let mut cursor2 = AvroCursor::new(&data2);
1422 decoder
1423 .decode(&mut cursor2)
1424 .expect("Failed to decode data2");
1425 assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
1426 let array = decoder.flush(None).expect("Failed to flush decoder");
1427 assert_eq!(array.len(), 2, "Array should contain two items");
1428 let fixed_size_binary_array = array
1429 .as_any()
1430 .downcast_ref::<FixedSizeBinaryArray>()
1431 .expect("Failed to downcast to FixedSizeBinaryArray");
1432 assert_eq!(
1433 fixed_size_binary_array.value_length(),
1434 3,
1435 "Fixed size of binary values should be 3"
1436 );
1437 assert_eq!(
1438 fixed_size_binary_array.value(0),
1439 &[1, 2, 3],
1440 "First item mismatch"
1441 );
1442 assert_eq!(
1443 fixed_size_binary_array.value(1),
1444 &[4, 5, 6],
1445 "Second item mismatch"
1446 );
1447 }
1448
1449 #[test]
1450 fn test_fixed_decoding_empty() {
1451 let avro_type = avro_from_codec(Codec::Fixed(5));
1452 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
1453
1454 let array = decoder
1455 .flush(None)
1456 .expect("Failed to flush decoder for empty input");
1457
1458 assert_eq!(array.len(), 0, "Array should be empty");
1459 let fixed_size_binary_array = array
1460 .as_any()
1461 .downcast_ref::<FixedSizeBinaryArray>()
1462 .expect("Failed to downcast to FixedSizeBinaryArray for empty array");
1463
1464 assert_eq!(
1465 fixed_size_binary_array.value_length(),
1466 5,
1467 "Fixed size of binary values should be 5 as per type"
1468 );
1469 }
1470
1471 #[test]
1472 fn test_uuid_decoding() {
1473 let avro_type = avro_from_codec(Codec::Uuid);
1474 let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
1475 let uuid_str = "f81d4fae-7dec-11d0-a765-00a0c91e6bf6";
1476 let data = encode_avro_bytes(uuid_str.as_bytes());
1477 let mut cursor = AvroCursor::new(&data);
1478 decoder.decode(&mut cursor).expect("Failed to decode data");
1479 assert_eq!(
1480 cursor.position(),
1481 data.len(),
1482 "Cursor should advance by varint size + data size"
1483 );
1484 let array = decoder.flush(None).expect("Failed to flush decoder");
1485 let fixed_size_binary_array = array
1486 .as_any()
1487 .downcast_ref::<FixedSizeBinaryArray>()
1488 .expect("Array should be a FixedSizeBinaryArray");
1489 assert_eq!(fixed_size_binary_array.len(), 1);
1490 assert_eq!(fixed_size_binary_array.value_length(), 16);
1491 let expected_bytes = [
1492 0xf8, 0x1d, 0x4f, 0xae, 0x7d, 0xec, 0x11, 0xd0, 0xa7, 0x65, 0x00, 0xa0, 0xc9, 0x1e,
1493 0x6b, 0xf6,
1494 ];
1495 assert_eq!(fixed_size_binary_array.value(0), &expected_bytes);
1496 }
1497
1498 #[test]
1499 fn test_array_decoding() {
1500 let item_dt = avro_from_codec(Codec::Int32);
1501 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
1502 let mut decoder = Decoder::try_new(&list_dt).unwrap();
1503 let mut row1 = Vec::new();
1504 row1.extend_from_slice(&encode_avro_long(2));
1505 row1.extend_from_slice(&encode_avro_int(10));
1506 row1.extend_from_slice(&encode_avro_int(20));
1507 row1.extend_from_slice(&encode_avro_long(0));
1508 let row2 = encode_avro_long(0);
1509 let mut cursor = AvroCursor::new(&row1);
1510 decoder.decode(&mut cursor).unwrap();
1511 let mut cursor2 = AvroCursor::new(&row2);
1512 decoder.decode(&mut cursor2).unwrap();
1513 let array = decoder.flush(None).unwrap();
1514 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
1515 assert_eq!(list_arr.len(), 2);
1516 let offsets = list_arr.value_offsets();
1517 assert_eq!(offsets, &[0, 2, 2]);
1518 let values = list_arr.values();
1519 let int_arr = values.as_primitive::<Int32Type>();
1520 assert_eq!(int_arr.len(), 2);
1521 assert_eq!(int_arr.value(0), 10);
1522 assert_eq!(int_arr.value(1), 20);
1523 }
1524
1525 #[test]
1526 fn test_array_decoding_with_negative_block_count() {
1527 let item_dt = avro_from_codec(Codec::Int32);
1528 let list_dt = avro_from_codec(Codec::List(Arc::new(item_dt)));
1529 let mut decoder = Decoder::try_new(&list_dt).unwrap();
1530 let mut data = encode_avro_long(-3);
1531 data.extend_from_slice(&encode_avro_long(12));
1532 data.extend_from_slice(&encode_avro_int(1));
1533 data.extend_from_slice(&encode_avro_int(2));
1534 data.extend_from_slice(&encode_avro_int(3));
1535 data.extend_from_slice(&encode_avro_long(0));
1536 let mut cursor = AvroCursor::new(&data);
1537 decoder.decode(&mut cursor).unwrap();
1538 let array = decoder.flush(None).unwrap();
1539 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
1540 assert_eq!(list_arr.len(), 1);
1541 assert_eq!(list_arr.value_length(0), 3);
1542 let values = list_arr.values().as_primitive::<Int32Type>();
1543 assert_eq!(values.len(), 3);
1544 assert_eq!(values.value(0), 1);
1545 assert_eq!(values.value(1), 2);
1546 assert_eq!(values.value(2), 3);
1547 }
1548
1549 #[test]
1550 fn test_nested_array_decoding() {
1551 let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
1552 let nested_ty = avro_from_codec(Codec::List(Arc::new(inner_ty.clone())));
1553 let mut decoder = Decoder::try_new(&nested_ty).unwrap();
1554 let mut buf = Vec::new();
1555 buf.extend(encode_avro_long(1));
1556 buf.extend(encode_avro_long(2));
1557 buf.extend(encode_avro_int(5));
1558 buf.extend(encode_avro_int(6));
1559 buf.extend(encode_avro_long(0));
1560 buf.extend(encode_avro_long(0));
1561 let mut cursor = AvroCursor::new(&buf);
1562 decoder.decode(&mut cursor).unwrap();
1563 let arr = decoder.flush(None).unwrap();
1564 let outer = arr.as_any().downcast_ref::<ListArray>().unwrap();
1565 assert_eq!(outer.len(), 1);
1566 assert_eq!(outer.value_length(0), 1);
1567 let inner = outer.values().as_any().downcast_ref::<ListArray>().unwrap();
1568 assert_eq!(inner.len(), 1);
1569 assert_eq!(inner.value_length(0), 2);
1570 let values = inner
1571 .values()
1572 .as_any()
1573 .downcast_ref::<Int32Array>()
1574 .unwrap();
1575 assert_eq!(values.values(), &[5, 6]);
1576 }
1577
1578 #[test]
1579 fn test_array_decoding_empty_array() {
1580 let value_type = avro_from_codec(Codec::Utf8);
1581 let map_type = avro_from_codec(Codec::List(Arc::new(value_type)));
1582 let mut decoder = Decoder::try_new(&map_type).unwrap();
1583 let data = encode_avro_long(0);
1584 decoder.decode(&mut AvroCursor::new(&data)).unwrap();
1585 let array = decoder.flush(None).unwrap();
1586 let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
1587 assert_eq!(list_arr.len(), 1);
1588 assert_eq!(list_arr.value_length(0), 0);
1589 }
1590
1591 #[test]
1592 fn test_decimal_decoding_fixed256() {
1593 let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
1594 let mut decoder = Decoder::try_new(&dt).unwrap();
1595 let row1 = [
1596 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1597 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1598 0x00, 0x00, 0x30, 0x39,
1599 ];
1600 let row2 = [
1601 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1602 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1603 0xFF, 0xFF, 0xFF, 0x85,
1604 ];
1605 let mut data = Vec::new();
1606 data.extend_from_slice(&row1);
1607 data.extend_from_slice(&row2);
1608 let mut cursor = AvroCursor::new(&data);
1609 decoder.decode(&mut cursor).unwrap();
1610 decoder.decode(&mut cursor).unwrap();
1611 let arr = decoder.flush(None).unwrap();
1612 let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
1613 assert_eq!(dec.len(), 2);
1614 assert_eq!(dec.value_as_string(0), "123.45");
1615 assert_eq!(dec.value_as_string(1), "-1.23");
1616 }
1617
1618 #[test]
1619 fn test_decimal_decoding_fixed128() {
1620 let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16)));
1621 let mut decoder = Decoder::try_new(&dt).unwrap();
1622 let row1 = [
1623 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1624 0x30, 0x39,
1625 ];
1626 let row2 = [
1627 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1628 0xFF, 0x85,
1629 ];
1630 let mut data = Vec::new();
1631 data.extend_from_slice(&row1);
1632 data.extend_from_slice(&row2);
1633 let mut cursor = AvroCursor::new(&data);
1634 decoder.decode(&mut cursor).unwrap();
1635 decoder.decode(&mut cursor).unwrap();
1636 let arr = decoder.flush(None).unwrap();
1637 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1638 assert_eq!(dec.len(), 2);
1639 assert_eq!(dec.value_as_string(0), "123.45");
1640 assert_eq!(dec.value_as_string(1), "-1.23");
1641 }
1642
1643 #[test]
1644 fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() {
1645 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32)));
1646 let mut decoder = Decoder::try_new(&dt).unwrap();
1647 let row1 = [
1648 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1649 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1650 0x00, 0x00, 0x30, 0x39,
1651 ];
1652 let row2 = [
1653 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1654 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1655 0xFF, 0xFF, 0xFF, 0x85,
1656 ];
1657 let mut data = Vec::new();
1658 data.extend_from_slice(&row1);
1659 data.extend_from_slice(&row2);
1660 let mut cursor = AvroCursor::new(&data);
1661 decoder.decode(&mut cursor).unwrap();
1662 decoder.decode(&mut cursor).unwrap();
1663 let arr = decoder.flush(None).unwrap();
1664 #[cfg(feature = "small_decimals")]
1665 {
1666 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
1667 assert_eq!(dec.len(), 2);
1668 assert_eq!(dec.value_as_string(0), "123.45");
1669 assert_eq!(dec.value_as_string(1), "-1.23");
1670 }
1671 #[cfg(not(feature = "small_decimals"))]
1672 {
1673 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1674 assert_eq!(dec.len(), 2);
1675 assert_eq!(dec.value_as_string(0), "123.45");
1676 assert_eq!(dec.value_as_string(1), "-1.23");
1677 }
1678 }
1679
1680 #[test]
1681 fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() {
1682 let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16)));
1683 let mut decoder = Decoder::try_new(&dt).unwrap();
1684 let row1 = [
1685 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1686 0x30, 0x39,
1687 ];
1688 let row2 = [
1689 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
1690 0xFF, 0x85,
1691 ];
1692 let mut data = Vec::new();
1693 data.extend_from_slice(&row1);
1694 data.extend_from_slice(&row2);
1695 let mut cursor = AvroCursor::new(&data);
1696 decoder.decode(&mut cursor).unwrap();
1697 decoder.decode(&mut cursor).unwrap();
1698
1699 let arr = decoder.flush(None).unwrap();
1700 #[cfg(feature = "small_decimals")]
1701 {
1702 let dec = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
1703 assert_eq!(dec.len(), 2);
1704 assert_eq!(dec.value_as_string(0), "123.45");
1705 assert_eq!(dec.value_as_string(1), "-1.23");
1706 }
1707 #[cfg(not(feature = "small_decimals"))]
1708 {
1709 let dec = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1710 assert_eq!(dec.len(), 2);
1711 assert_eq!(dec.value_as_string(0), "123.45");
1712 assert_eq!(dec.value_as_string(1), "-1.23");
1713 }
1714 }
1715
1716 #[test]
1717 fn test_decimal_decoding_bytes_with_nulls() {
1718 let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
1719 let inner = Decoder::try_new(&dt).unwrap();
1720 let mut decoder = Decoder::Nullable(
1721 Nullability::NullSecond,
1722 NullBufferBuilder::new(DEFAULT_CAPACITY),
1723 Box::new(inner),
1724 );
1725 let mut data = Vec::new();
1726 data.extend_from_slice(&encode_avro_int(0));
1727 data.extend_from_slice(&encode_avro_bytes(&[0x04, 0xD2]));
1728 data.extend_from_slice(&encode_avro_int(1));
1729 data.extend_from_slice(&encode_avro_int(0));
1730 data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E]));
1731 let mut cursor = AvroCursor::new(&data);
1732 decoder.decode(&mut cursor).unwrap();
1733 decoder.decode(&mut cursor).unwrap();
1734 decoder.decode(&mut cursor).unwrap();
1735 let arr = decoder.flush(None).unwrap();
1736 #[cfg(feature = "small_decimals")]
1737 {
1738 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
1739 assert_eq!(dec_arr.len(), 3);
1740 assert!(dec_arr.is_valid(0));
1741 assert!(!dec_arr.is_valid(1));
1742 assert!(dec_arr.is_valid(2));
1743 assert_eq!(dec_arr.value_as_string(0), "123.4");
1744 assert_eq!(dec_arr.value_as_string(2), "-123.4");
1745 }
1746 #[cfg(not(feature = "small_decimals"))]
1747 {
1748 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1749 assert_eq!(dec_arr.len(), 3);
1750 assert!(dec_arr.is_valid(0));
1751 assert!(!dec_arr.is_valid(1));
1752 assert!(dec_arr.is_valid(2));
1753 assert_eq!(dec_arr.value_as_string(0), "123.4");
1754 assert_eq!(dec_arr.value_as_string(2), "-123.4");
1755 }
1756 }
1757
1758 #[test]
1759 fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() {
1760 let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
1761 let inner = Decoder::try_new(&dt).unwrap();
1762 let mut decoder = Decoder::Nullable(
1763 Nullability::NullSecond,
1764 NullBufferBuilder::new(DEFAULT_CAPACITY),
1765 Box::new(inner),
1766 );
1767 let row1 = [
1768 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
1769 0xE2, 0x40,
1770 ];
1771 let row3 = [
1772 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE,
1773 0x1D, 0xC0,
1774 ];
1775 let mut data = Vec::new();
1776 data.extend_from_slice(&encode_avro_int(0));
1777 data.extend_from_slice(&row1);
1778 data.extend_from_slice(&encode_avro_int(1));
1779 data.extend_from_slice(&encode_avro_int(0));
1780 data.extend_from_slice(&row3);
1781 let mut cursor = AvroCursor::new(&data);
1782 decoder.decode(&mut cursor).unwrap();
1783 decoder.decode(&mut cursor).unwrap();
1784 decoder.decode(&mut cursor).unwrap();
1785 let arr = decoder.flush(None).unwrap();
1786 #[cfg(feature = "small_decimals")]
1787 {
1788 let dec_arr = arr.as_any().downcast_ref::<Decimal32Array>().unwrap();
1789 assert_eq!(dec_arr.len(), 3);
1790 assert!(dec_arr.is_valid(0));
1791 assert!(!dec_arr.is_valid(1));
1792 assert!(dec_arr.is_valid(2));
1793 assert_eq!(dec_arr.value_as_string(0), "1234.56");
1794 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
1795 }
1796 #[cfg(not(feature = "small_decimals"))]
1797 {
1798 let dec_arr = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
1799 assert_eq!(dec_arr.len(), 3);
1800 assert!(dec_arr.is_valid(0));
1801 assert!(!dec_arr.is_valid(1));
1802 assert!(dec_arr.is_valid(2));
1803 assert_eq!(dec_arr.value_as_string(0), "1234.56");
1804 assert_eq!(dec_arr.value_as_string(2), "-1234.56");
1805 }
1806 }
1807
1808 #[test]
1809 fn test_enum_decoding() {
1810 let symbols: Arc<[String]> = vec!["A", "B", "C"].into_iter().map(String::from).collect();
1811 let avro_type = avro_from_codec(Codec::Enum(symbols.clone()));
1812 let mut decoder = Decoder::try_new(&avro_type).unwrap();
1813 let mut data = Vec::new();
1814 data.extend_from_slice(&encode_avro_int(2));
1815 data.extend_from_slice(&encode_avro_int(0));
1816 data.extend_from_slice(&encode_avro_int(1));
1817 let mut cursor = AvroCursor::new(&data);
1818 decoder.decode(&mut cursor).unwrap();
1819 decoder.decode(&mut cursor).unwrap();
1820 decoder.decode(&mut cursor).unwrap();
1821 let array = decoder.flush(None).unwrap();
1822 let dict_array = array
1823 .as_any()
1824 .downcast_ref::<DictionaryArray<Int32Type>>()
1825 .unwrap();
1826 assert_eq!(dict_array.len(), 3);
1827 let values = dict_array
1828 .values()
1829 .as_any()
1830 .downcast_ref::<StringArray>()
1831 .unwrap();
1832 assert_eq!(values.value(0), "A");
1833 assert_eq!(values.value(1), "B");
1834 assert_eq!(values.value(2), "C");
1835 assert_eq!(dict_array.keys().values(), &[2, 0, 1]);
1836 }
1837
1838 #[test]
1839 fn test_enum_decoding_with_nulls() {
1840 let symbols: Arc<[String]> = vec!["X", "Y"].into_iter().map(String::from).collect();
1841 let enum_codec = Codec::Enum(symbols.clone());
1842 let avro_type =
1843 AvroDataType::new(enum_codec, Default::default(), Some(Nullability::NullFirst));
1844 let mut decoder = Decoder::try_new(&avro_type).unwrap();
1845 let mut data = Vec::new();
1846 data.extend_from_slice(&encode_avro_long(1));
1847 data.extend_from_slice(&encode_avro_int(1));
1848 data.extend_from_slice(&encode_avro_long(0));
1849 data.extend_from_slice(&encode_avro_long(1));
1850 data.extend_from_slice(&encode_avro_int(0));
1851 let mut cursor = AvroCursor::new(&data);
1852 decoder.decode(&mut cursor).unwrap();
1853 decoder.decode(&mut cursor).unwrap();
1854 decoder.decode(&mut cursor).unwrap();
1855 let array = decoder.flush(None).unwrap();
1856 let dict_array = array
1857 .as_any()
1858 .downcast_ref::<DictionaryArray<Int32Type>>()
1859 .unwrap();
1860 assert_eq!(dict_array.len(), 3);
1861 assert!(dict_array.is_valid(0));
1862 assert!(dict_array.is_null(1));
1863 assert!(dict_array.is_valid(2));
1864 let expected_keys = Int32Array::from(vec![Some(1), None, Some(0)]);
1865 assert_eq!(dict_array.keys(), &expected_keys);
1866 let values = dict_array
1867 .values()
1868 .as_any()
1869 .downcast_ref::<StringArray>()
1870 .unwrap();
1871 assert_eq!(values.value(0), "X");
1872 assert_eq!(values.value(1), "Y");
1873 }
1874
1875 #[test]
1876 fn test_duration_decoding_with_nulls() {
1877 let duration_codec = Codec::Interval;
1878 let avro_type = AvroDataType::new(
1879 duration_codec,
1880 Default::default(),
1881 Some(Nullability::NullFirst),
1882 );
1883 let mut decoder = Decoder::try_new(&avro_type).unwrap();
1884 let mut data = Vec::new();
1885 data.extend_from_slice(&encode_avro_long(1)); let mut duration1 = Vec::new();
1888 duration1.extend_from_slice(&1u32.to_le_bytes());
1889 duration1.extend_from_slice(&2u32.to_le_bytes());
1890 duration1.extend_from_slice(&3u32.to_le_bytes());
1891 data.extend_from_slice(&duration1);
1892 data.extend_from_slice(&encode_avro_long(0)); data.extend_from_slice(&encode_avro_long(1)); let mut duration2 = Vec::new();
1896 duration2.extend_from_slice(&4u32.to_le_bytes());
1897 duration2.extend_from_slice(&5u32.to_le_bytes());
1898 duration2.extend_from_slice(&6u32.to_le_bytes());
1899 data.extend_from_slice(&duration2);
1900 let mut cursor = AvroCursor::new(&data);
1901 decoder.decode(&mut cursor).unwrap();
1902 decoder.decode(&mut cursor).unwrap();
1903 decoder.decode(&mut cursor).unwrap();
1904 let array = decoder.flush(None).unwrap();
1905 let interval_array = array
1906 .as_any()
1907 .downcast_ref::<IntervalMonthDayNanoArray>()
1908 .unwrap();
1909 assert_eq!(interval_array.len(), 3);
1910 assert!(interval_array.is_valid(0));
1911 assert!(interval_array.is_null(1));
1912 assert!(interval_array.is_valid(2));
1913 let expected = IntervalMonthDayNanoArray::from(vec![
1914 Some(IntervalMonthDayNano {
1915 months: 1,
1916 days: 2,
1917 nanoseconds: 3_000_000,
1918 }),
1919 None,
1920 Some(IntervalMonthDayNano {
1921 months: 4,
1922 days: 5,
1923 nanoseconds: 6_000_000,
1924 }),
1925 ]);
1926 assert_eq!(interval_array, &expected);
1927 }
1928
1929 #[test]
1930 fn test_duration_decoding_empty() {
1931 let duration_codec = Codec::Interval;
1932 let avro_type = AvroDataType::new(duration_codec, Default::default(), None);
1933 let mut decoder = Decoder::try_new(&avro_type).unwrap();
1934 let array = decoder.flush(None).unwrap();
1935 assert_eq!(array.len(), 0);
1936 }
1937
1938 #[test]
1939 fn test_nullable_decode_error_bitmap_corruption() {
1940 let avro_type = AvroDataType::new(
1942 Codec::Int32,
1943 Default::default(),
1944 Some(Nullability::NullSecond),
1945 );
1946 let mut decoder = Decoder::try_new(&avro_type).unwrap();
1947
1948 let mut row1 = Vec::new();
1950 row1.extend_from_slice(&encode_avro_int(1));
1951
1952 let mut row2 = Vec::new();
1954 row2.extend_from_slice(&encode_avro_int(0)); let mut row3 = Vec::new();
1958 row3.extend_from_slice(&encode_avro_int(0)); row3.extend_from_slice(&encode_avro_int(42)); decoder.decode(&mut AvroCursor::new(&row1)).unwrap();
1962 assert!(decoder.decode(&mut AvroCursor::new(&row2)).is_err()); decoder.decode(&mut AvroCursor::new(&row3)).unwrap();
1964
1965 let array = decoder.flush(None).unwrap();
1966
1967 assert_eq!(array.len(), 2);
1969 let int_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
1970 assert!(int_array.is_null(0)); assert_eq!(int_array.value(1), 42); }
1973
1974 #[test]
1975 fn test_enum_mapping_reordered_symbols() {
1976 let reader_symbols: Arc<[String]> =
1977 vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
1978 let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
1979 let default_index: i32 = -1;
1980 let mut dec = Decoder::EnumResolved {
1981 indices: Vec::with_capacity(DEFAULT_CAPACITY),
1982 symbols: reader_symbols.clone(),
1983 mapping,
1984 default_index,
1985 };
1986 let mut data = Vec::new();
1987 data.extend_from_slice(&encode_avro_int(0));
1988 data.extend_from_slice(&encode_avro_int(1));
1989 data.extend_from_slice(&encode_avro_int(2));
1990 let mut cur = AvroCursor::new(&data);
1991 dec.decode(&mut cur).unwrap();
1992 dec.decode(&mut cur).unwrap();
1993 dec.decode(&mut cur).unwrap();
1994 let arr = dec.flush(None).unwrap();
1995 let dict = arr
1996 .as_any()
1997 .downcast_ref::<DictionaryArray<Int32Type>>()
1998 .unwrap();
1999 let expected_keys = Int32Array::from(vec![2, 0, 1]);
2000 assert_eq!(dict.keys(), &expected_keys);
2001 let values = dict
2002 .values()
2003 .as_any()
2004 .downcast_ref::<StringArray>()
2005 .unwrap();
2006 assert_eq!(values.value(0), "B");
2007 assert_eq!(values.value(1), "C");
2008 assert_eq!(values.value(2), "A");
2009 }
2010
2011 #[test]
2012 fn test_enum_mapping_unknown_symbol_and_out_of_range_fall_back_to_default() {
2013 let reader_symbols: Arc<[String]> = vec!["A".to_string(), "B".to_string()].into();
2014 let default_index: i32 = 1;
2015 let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
2016 let mut dec = Decoder::EnumResolved {
2017 indices: Vec::with_capacity(DEFAULT_CAPACITY),
2018 symbols: reader_symbols.clone(),
2019 mapping,
2020 default_index,
2021 };
2022 let mut data = Vec::new();
2023 data.extend_from_slice(&encode_avro_int(0));
2024 data.extend_from_slice(&encode_avro_int(1));
2025 data.extend_from_slice(&encode_avro_int(99));
2026 let mut cur = AvroCursor::new(&data);
2027 dec.decode(&mut cur).unwrap();
2028 dec.decode(&mut cur).unwrap();
2029 dec.decode(&mut cur).unwrap();
2030 let arr = dec.flush(None).unwrap();
2031 let dict = arr
2032 .as_any()
2033 .downcast_ref::<DictionaryArray<Int32Type>>()
2034 .unwrap();
2035 let expected_keys = Int32Array::from(vec![0, 1, 1]);
2036 assert_eq!(dict.keys(), &expected_keys);
2037 let values = dict
2038 .values()
2039 .as_any()
2040 .downcast_ref::<StringArray>()
2041 .unwrap();
2042 assert_eq!(values.value(0), "A");
2043 assert_eq!(values.value(1), "B");
2044 }
2045
2046 #[test]
2047 fn test_enum_mapping_unknown_symbol_without_default_errors() {
2048 let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
2049 let default_index: i32 = -1; let mapping: Arc<[i32]> = Arc::from(vec![-1]);
2051 let mut dec = Decoder::EnumResolved {
2052 indices: Vec::with_capacity(DEFAULT_CAPACITY),
2053 symbols: reader_symbols,
2054 mapping,
2055 default_index,
2056 };
2057 let data = encode_avro_int(0);
2058 let mut cur = AvroCursor::new(&data);
2059 let err = dec
2060 .decode(&mut cur)
2061 .expect_err("expected decode error for unresolved enum without default");
2062 let msg = err.to_string();
2063 assert!(
2064 msg.contains("not resolvable") && msg.contains("no default"),
2065 "unexpected error message: {msg}"
2066 );
2067 }
2068
2069 fn make_record_resolved_decoder(
2070 reader_fields: &[(&str, DataType, bool)],
2071 writer_to_reader: Vec<Option<usize>>,
2072 mut skip_decoders: Vec<Option<super::Skipper>>,
2073 ) -> Decoder {
2074 let mut field_refs: Vec<FieldRef> = Vec::with_capacity(reader_fields.len());
2075 let mut encodings: Vec<Decoder> = Vec::with_capacity(reader_fields.len());
2076 for (name, dt, nullable) in reader_fields {
2077 field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable)));
2078 let enc = match dt {
2079 DataType::Int32 => Decoder::Int32(Vec::new()),
2080 DataType::Int64 => Decoder::Int64(Vec::new()),
2081 DataType::Utf8 => {
2082 Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new())
2083 }
2084 other => panic!("Unsupported test reader field type: {other:?}"),
2085 };
2086 encodings.push(enc);
2087 }
2088 let fields: Fields = field_refs.into();
2089 Decoder::RecordResolved {
2090 fields,
2091 encodings,
2092 writer_to_reader: Arc::from(writer_to_reader),
2093 skip_decoders,
2094 }
2095 }
2096
2097 #[test]
2098 fn test_skip_writer_trailing_field_int32() {
2099 let mut dec = make_record_resolved_decoder(
2100 &[("id", arrow_schema::DataType::Int32, false)],
2101 vec![Some(0), None],
2102 vec![None, Some(super::Skipper::Int32)],
2103 );
2104 let mut data = Vec::new();
2105 data.extend_from_slice(&encode_avro_int(7));
2106 data.extend_from_slice(&encode_avro_int(999));
2107 let mut cur = AvroCursor::new(&data);
2108 dec.decode(&mut cur).unwrap();
2109 assert_eq!(cur.position(), data.len());
2110 let arr = dec.flush(None).unwrap();
2111 let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
2112 assert_eq!(struct_arr.len(), 1);
2113 let id = struct_arr
2114 .column_by_name("id")
2115 .unwrap()
2116 .as_any()
2117 .downcast_ref::<Int32Array>()
2118 .unwrap();
2119 assert_eq!(id.value(0), 7);
2120 }
2121
2122 #[test]
2123 fn test_skip_writer_middle_field_string() {
2124 let mut dec = make_record_resolved_decoder(
2125 &[
2126 ("id", DataType::Int32, false),
2127 ("score", DataType::Int64, false),
2128 ],
2129 vec![Some(0), None, Some(1)],
2130 vec![None, Some(Skipper::String), None],
2131 );
2132 let mut data = Vec::new();
2133 data.extend_from_slice(&encode_avro_int(42));
2134 data.extend_from_slice(&encode_avro_bytes(b"abcdef"));
2135 data.extend_from_slice(&encode_avro_long(1000));
2136 let mut cur = AvroCursor::new(&data);
2137 dec.decode(&mut cur).unwrap();
2138 assert_eq!(cur.position(), data.len());
2139 let arr = dec.flush(None).unwrap();
2140 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
2141 let id = s
2142 .column_by_name("id")
2143 .unwrap()
2144 .as_any()
2145 .downcast_ref::<Int32Array>()
2146 .unwrap();
2147 let score = s
2148 .column_by_name("score")
2149 .unwrap()
2150 .as_any()
2151 .downcast_ref::<Int64Array>()
2152 .unwrap();
2153 assert_eq!(id.value(0), 42);
2154 assert_eq!(score.value(0), 1000);
2155 }
2156
2157 #[test]
2158 fn test_skip_writer_array_with_negative_block_count_fast() {
2159 let mut dec = make_record_resolved_decoder(
2160 &[("id", DataType::Int32, false)],
2161 vec![None, Some(0)],
2162 vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None],
2163 );
2164 let mut array_payload = Vec::new();
2165 array_payload.extend_from_slice(&encode_avro_int(1));
2166 array_payload.extend_from_slice(&encode_avro_int(2));
2167 array_payload.extend_from_slice(&encode_avro_int(3));
2168 let mut data = Vec::new();
2169 data.extend_from_slice(&encode_avro_long(-3));
2170 data.extend_from_slice(&encode_avro_long(array_payload.len() as i64));
2171 data.extend_from_slice(&array_payload);
2172 data.extend_from_slice(&encode_avro_long(0));
2173 data.extend_from_slice(&encode_avro_int(5));
2174 let mut cur = AvroCursor::new(&data);
2175 dec.decode(&mut cur).unwrap();
2176 assert_eq!(cur.position(), data.len());
2177 let arr = dec.flush(None).unwrap();
2178 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
2179 let id = s
2180 .column_by_name("id")
2181 .unwrap()
2182 .as_any()
2183 .downcast_ref::<Int32Array>()
2184 .unwrap();
2185 assert_eq!(id.len(), 1);
2186 assert_eq!(id.value(0), 5);
2187 }
2188
2189 #[test]
2190 fn test_skip_writer_map_with_negative_block_count_fast() {
2191 let mut dec = make_record_resolved_decoder(
2192 &[("id", DataType::Int32, false)],
2193 vec![None, Some(0)],
2194 vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None],
2195 );
2196 let mut entries = Vec::new();
2197 entries.extend_from_slice(&encode_avro_bytes(b"k1"));
2198 entries.extend_from_slice(&encode_avro_int(10));
2199 entries.extend_from_slice(&encode_avro_bytes(b"k2"));
2200 entries.extend_from_slice(&encode_avro_int(20));
2201 let mut data = Vec::new();
2202 data.extend_from_slice(&encode_avro_long(-2));
2203 data.extend_from_slice(&encode_avro_long(entries.len() as i64));
2204 data.extend_from_slice(&entries);
2205 data.extend_from_slice(&encode_avro_long(0));
2206 data.extend_from_slice(&encode_avro_int(123));
2207 let mut cur = AvroCursor::new(&data);
2208 dec.decode(&mut cur).unwrap();
2209 assert_eq!(cur.position(), data.len());
2210 let arr = dec.flush(None).unwrap();
2211 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
2212 let id = s
2213 .column_by_name("id")
2214 .unwrap()
2215 .as_any()
2216 .downcast_ref::<Int32Array>()
2217 .unwrap();
2218 assert_eq!(id.len(), 1);
2219 assert_eq!(id.value(0), 123);
2220 }
2221
2222 #[test]
2223 fn test_skip_writer_nullable_field_union_nullfirst() {
2224 let mut dec = make_record_resolved_decoder(
2225 &[("id", DataType::Int32, false)],
2226 vec![None, Some(0)],
2227 vec![
2228 Some(super::Skipper::Nullable(
2229 Nullability::NullFirst,
2230 Box::new(super::Skipper::Int32),
2231 )),
2232 None,
2233 ],
2234 );
2235 let mut row1 = Vec::new();
2236 row1.extend_from_slice(&encode_avro_long(0));
2237 row1.extend_from_slice(&encode_avro_int(5));
2238 let mut row2 = Vec::new();
2239 row2.extend_from_slice(&encode_avro_long(1));
2240 row2.extend_from_slice(&encode_avro_int(123));
2241 row2.extend_from_slice(&encode_avro_int(7));
2242 let mut cur1 = AvroCursor::new(&row1);
2243 let mut cur2 = AvroCursor::new(&row2);
2244 dec.decode(&mut cur1).unwrap();
2245 dec.decode(&mut cur2).unwrap();
2246 assert_eq!(cur1.position(), row1.len());
2247 assert_eq!(cur2.position(), row2.len());
2248 let arr = dec.flush(None).unwrap();
2249 let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
2250 let id = s
2251 .column_by_name("id")
2252 .unwrap()
2253 .as_any()
2254 .downcast_ref::<Int32Array>()
2255 .unwrap();
2256 assert_eq!(id.len(), 2);
2257 assert_eq!(id.value(0), 5);
2258 assert_eq!(id.value(1), 7);
2259 }
2260}