Decoder

Struct Decoder 

Source
pub struct Decoder {
    active_decoder: RecordDecoder,
    active_fingerprint: Option<Fingerprint>,
    batch_size: usize,
    remaining_capacity: usize,
    cache: IndexMap<Fingerprint, RecordDecoder>,
    fingerprint_algorithm: FingerprintAlgorithm,
    utf8_view: bool,
    strict_mode: bool,
    pending_schema: Option<(Fingerprint, RecordDecoder)>,
    awaiting_body: bool,
}
Expand description

A low‑level, push‑based decoder from Avro bytes to Arrow RecordBatch.

Decoder is designed for streaming scenarios:

  • You feed freshly received bytes using Self::decode, potentially multiple times, until at least one row is complete.
  • You then drain completed rows with Self::flush, which yields a RecordBatch if any rows were finished since the last flush.

Unlike Reader, which is specialized for Avro Object Container Files, Decoder understands framed single‑object inputs and Confluent Schema Registry messages, switching schemas mid‑stream when the framing indicates a new fingerprint.

§Supported prefixes

On each new row boundary, Decoder tries to match one of the following “prefixes”:

  • Single‑Object encoding: magic 0xC3 0x01 + schema fingerprint (length depends on the configured FingerprintAlgorithm); see SINGLE_OBJECT_MAGIC.
  • Confluent wire format: magic 0x00 + 4‑byte big‑endian schema id; see CONFLUENT_MAGIC.

The active fingerprint determines which cached row decoder is used to decode the following record body bytes.

§Schema switching semantics

When a new fingerprint is observed:

  • If the current batch is empty, the decoder switches immediately;
  • Otherwise, the current batch is finalized on the next flush and only then does the decoder switch to the new schema. This guarantees that a single RecordBatch never mixes rows with different schemas.

§Examples

Build and use a Decoder for single‑object encoding:

use arrow_avro::schema::{AvroSchema, SchemaStore};
use arrow_avro::reader::ReaderBuilder;

// Use a record schema at the top level so we can build an Arrow RecordBatch
let mut store = SchemaStore::new(); // Rabin fingerprinting by default
let avro = AvroSchema::new(
    r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()
);
let fp = store.register(avro)?;

// --- Hidden: write a single-object framed row {x:7} ---

let mut decoder = ReaderBuilder::new()
    .with_writer_schema_store(store)
    .with_batch_size(16)
    .build_decoder()?;

let batch = decoder.flush()?.expect("one row");
assert_eq!(batch.num_rows(), 1);

Background: Avro’s single‑object encoding is defined as 0xC3 0x01 + 8‑byte little‑endian CRC‑64‑AVRO fingerprint of the writer schema + Avro binary body. See the Avro 1.11.1 spec for details. https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding

Build and use a Decoder for Confluent Registry messages:

use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm};
use arrow_avro::reader::ReaderBuilder;

let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
store.set(Fingerprint::Id(1234), AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;

// --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---

let mut decoder = ReaderBuilder::new()
    .with_writer_schema_store(store)
    .build_decoder()?;
let batch = decoder.flush()?.expect("two rows");
assert_eq!(batch.num_rows(), 2);

Fields§

§active_decoder: RecordDecoder§active_fingerprint: Option<Fingerprint>§batch_size: usize§remaining_capacity: usize§cache: IndexMap<Fingerprint, RecordDecoder>§fingerprint_algorithm: FingerprintAlgorithm§utf8_view: bool§strict_mode: bool§pending_schema: Option<(Fingerprint, RecordDecoder)>§awaiting_body: bool

Implementations§

Source§

impl Decoder

Source

pub fn schema(&self) -> SchemaRef

Returns the Arrow schema for the rows decoded by this decoder.

Note: With single‑object or Confluent framing, the schema may change at a row boundary when the input indicates a new fingerprint.

Source

pub fn batch_size(&self) -> usize

Returns the configured maximum number of rows per batch.

Source

pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError>

Feed a chunk of bytes into the decoder.

This will:

  • Decode at most Self::batch_size rows;
  • Return the number of input bytes consumed from data (which may be 0 if more bytes are required, or less than data.len() if a prefix/body straddles the chunk boundary);
  • Defer producing a RecordBatch until you call Self::flush.
§Returns

The number of bytes consumed from data.

§Errors

Returns an error if:

  • The input indicates an unknown fingerprint (not present in the provided SchemaStore;
  • The Avro body is malformed;
  • A strict‑mode union rule is violated (see ReaderBuilder::with_strict_mode).
Source

fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>, ArrowError>

Source

fn handle_prefix_common<const MAGIC_LEN: usize, const N: usize>( &mut self, buf: &[u8], magic: &[u8; MAGIC_LEN], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>

This method checks for the provided magic bytes at the start of buf and, if present, attempts to read the following fingerprint of N bytes, converting it to a Fingerprint using fingerprint_from.

Source

fn handle_fingerprint<const N: usize>( &mut self, buf: &[u8], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>

Source

fn apply_pending_schema(&mut self)

Source

fn apply_pending_schema_if_batch_empty(&mut self)

Source

fn flush_and_reset(&mut self) -> Result<Option<RecordBatch>, ArrowError>

Source

pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>

Produce a RecordBatch if at least one row is fully decoded, returning Ok(None) if no new rows are available.

If a schema change was detected while decoding rows for the current batch, the schema switch is applied after flushing this batch, so the next batch (if any) may have a different schema.

Source

pub fn capacity(&self) -> usize

Returns the number of rows that can be added to this decoder before it is full.

Source

pub fn batch_is_full(&self) -> bool

Returns true if the decoder has reached its capacity for the current batch.

Source

pub fn batch_is_empty(&self) -> bool

Returns true if the decoder has not decoded any batches yet (i.e., the current batch is empty).

Source

fn decode_block( &mut self, data: &[u8], count: usize, ) -> Result<(usize, usize), ArrowError>

Source

fn flush_block(&mut self) -> Result<Option<RecordBatch>, ArrowError>

Trait Implementations§

Source§

impl Debug for Decoder

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,