pub struct Decoder {
active_decoder: RecordDecoder,
active_fingerprint: Option<Fingerprint>,
batch_size: usize,
remaining_capacity: usize,
cache: IndexMap<Fingerprint, RecordDecoder>,
fingerprint_algorithm: FingerprintAlgorithm,
utf8_view: bool,
strict_mode: bool,
pending_schema: Option<(Fingerprint, RecordDecoder)>,
awaiting_body: bool,
}
Expand description
A low‑level, push‑based decoder from Avro bytes to Arrow RecordBatch
.
Decoder
is designed for streaming scenarios:
- You feed freshly received bytes using
Self::decode
, potentially multiple times, until at least one row is complete. - You then drain completed rows with
Self::flush
, which yields aRecordBatch
if any rows were finished since the last flush.
Unlike Reader
, which is specialized for Avro Object Container Files, Decoder
understands framed single‑object inputs and Confluent Schema Registry messages,
switching schemas mid‑stream when the framing indicates a new fingerprint.
§Supported prefixes
On each new row boundary, Decoder
tries to match one of the following “prefixes”:
- Single‑Object encoding: magic
0xC3 0x01
+ schema fingerprint (length depends on the configuredFingerprintAlgorithm
); seeSINGLE_OBJECT_MAGIC
. - Confluent wire format: magic
0x00
+ 4‑byte big‑endian schema id; seeCONFLUENT_MAGIC
.
The active fingerprint determines which cached row decoder is used to decode the following record body bytes.
§Schema switching semantics
When a new fingerprint is observed:
- If the current batch is empty, the decoder switches immediately;
- Otherwise, the current batch is finalized on the next
flush
and only then does the decoder switch to the new schema. This guarantees that a singleRecordBatch
never mixes rows with different schemas.
§Examples
Build and use a Decoder
for single‑object encoding:
use arrow_avro::schema::{AvroSchema, SchemaStore};
use arrow_avro::reader::ReaderBuilder;
// Use a record schema at the top level so we can build an Arrow RecordBatch
let mut store = SchemaStore::new(); // Rabin fingerprinting by default
let avro = AvroSchema::new(
r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()
);
let fp = store.register(avro)?;
// --- Hidden: write a single-object framed row {x:7} ---
let mut decoder = ReaderBuilder::new()
.with_writer_schema_store(store)
.with_batch_size(16)
.build_decoder()?;
let batch = decoder.flush()?.expect("one row");
assert_eq!(batch.num_rows(), 1);
Background: Avro’s single‑object encoding is defined as 0xC3 0x01
+ 8‑byte
little‑endian CRC‑64‑AVRO fingerprint of the writer schema + Avro binary body.
See the Avro 1.11.1 spec for details. https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding
Build and use a Decoder
for Confluent Registry messages:
use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm};
use arrow_avro::reader::ReaderBuilder;
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
store.set(Fingerprint::Id(1234), AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;
// --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---
let mut decoder = ReaderBuilder::new()
.with_writer_schema_store(store)
.build_decoder()?;
let batch = decoder.flush()?.expect("two rows");
assert_eq!(batch.num_rows(), 2);
Fields§
§active_decoder: RecordDecoder
§active_fingerprint: Option<Fingerprint>
§batch_size: usize
§remaining_capacity: usize
§cache: IndexMap<Fingerprint, RecordDecoder>
§fingerprint_algorithm: FingerprintAlgorithm
§utf8_view: bool
§strict_mode: bool
§pending_schema: Option<(Fingerprint, RecordDecoder)>
§awaiting_body: bool
Implementations§
Source§impl Decoder
impl Decoder
Sourcepub fn schema(&self) -> SchemaRef
pub fn schema(&self) -> SchemaRef
Returns the Arrow schema for the rows decoded by this decoder.
Note: With single‑object or Confluent framing, the schema may change at a row boundary when the input indicates a new fingerprint.
Sourcepub fn batch_size(&self) -> usize
pub fn batch_size(&self) -> usize
Returns the configured maximum number of rows per batch.
Sourcepub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError>
pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError>
Feed a chunk of bytes into the decoder.
This will:
- Decode at most
Self::batch_size
rows; - Return the number of input bytes consumed from
data
(which may be 0 if more bytes are required, or less thandata.len()
if a prefix/body straddles the chunk boundary); - Defer producing a
RecordBatch
until you callSelf::flush
.
§Returns
The number of bytes consumed from data
.
§Errors
Returns an error if:
- The input indicates an unknown fingerprint (not present in the provided
SchemaStore
; - The Avro body is malformed;
- A strict‑mode union rule is violated (see
ReaderBuilder::with_strict_mode
).
fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>, ArrowError>
Sourcefn handle_prefix_common<const MAGIC_LEN: usize, const N: usize>(
&mut self,
buf: &[u8],
magic: &[u8; MAGIC_LEN],
fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
) -> Result<Option<usize>, ArrowError>
fn handle_prefix_common<const MAGIC_LEN: usize, const N: usize>( &mut self, buf: &[u8], magic: &[u8; MAGIC_LEN], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>
This method checks for the provided magic
bytes at the start of buf
and, if present,
attempts to read the following fingerprint of N
bytes, converting it to a
Fingerprint
using fingerprint_from
.
fn handle_fingerprint<const N: usize>( &mut self, buf: &[u8], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>
fn apply_pending_schema(&mut self)
fn apply_pending_schema_if_batch_empty(&mut self)
fn flush_and_reset(&mut self) -> Result<Option<RecordBatch>, ArrowError>
Sourcepub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
Produce a RecordBatch
if at least one row is fully decoded, returning
Ok(None)
if no new rows are available.
If a schema change was detected while decoding rows for the current batch, the schema switch is applied after flushing this batch, so the next batch (if any) may have a different schema.
Sourcepub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
Returns the number of rows that can be added to this decoder before it is full.
Sourcepub fn batch_is_full(&self) -> bool
pub fn batch_is_full(&self) -> bool
Returns true if the decoder has reached its capacity for the current batch.
Sourcepub fn batch_is_empty(&self) -> bool
pub fn batch_is_empty(&self) -> bool
Returns true if the decoder has not decoded any batches yet (i.e., the current batch is empty).