pub struct Decoder {
active_decoder: RecordDecoder,
active_fingerprint: Option<Fingerprint>,
batch_size: usize,
remaining_capacity: usize,
cache: IndexMap<Fingerprint, RecordDecoder>,
fingerprint_algorithm: FingerprintAlgorithm,
pending_schema: Option<(Fingerprint, RecordDecoder)>,
awaiting_body: bool,
}Expand description
A low‑level, push‑based decoder from Avro bytes to Arrow RecordBatch.
Decoder is designed for streaming scenarios:
- You feed freshly received bytes using
Self::decode, potentially multiple times, until at least one row is complete. - You then drain completed rows with
Self::flush, which yields aRecordBatchif any rows were finished since the last flush.
Unlike Reader, which is specialized for Avro Object Container Files, Decoder
understands framed single‑object inputs and Confluent Schema Registry messages,
switching schemas mid‑stream when the framing indicates a new fingerprint.
§Supported prefixes
On each new row boundary, Decoder tries to match one of the following “prefixes”:
- Single‑Object encoding: magic
0xC3 0x01+ schema fingerprint (length depends on the configuredFingerprintAlgorithm); seeSINGLE_OBJECT_MAGIC. - Confluent wire format: magic
0x00+ 4‑byte big‑endian schema id; seeCONFLUENT_MAGIC.
The active fingerprint determines which cached row decoder is used to decode the following record body bytes.
§Schema switching semantics
When a new fingerprint is observed:
- If the current batch is empty, the decoder switches immediately;
- Otherwise, the current batch is finalized on the next
flushand only then does the decoder switch to the new schema. This guarantees that a singleRecordBatchnever mixes rows with different schemas.
§Examples
Build and use a Decoder for single‑object encoding:
use arrow_avro::schema::{AvroSchema, SchemaStore};
use arrow_avro::reader::ReaderBuilder;
// Use a record schema at the top level so we can build an Arrow RecordBatch
let mut store = SchemaStore::new(); // Rabin fingerprinting by default
let avro = AvroSchema::new(
r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()
);
let fp = store.register(avro)?;
// --- Hidden: write a single-object framed row {x:7} ---
let mut decoder = ReaderBuilder::new()
.with_writer_schema_store(store)
.with_batch_size(16)
.build_decoder()?;
let batch = decoder.flush()?.expect("one row");
assert_eq!(batch.num_rows(), 1);Background: Avro’s single‑object encoding is defined as 0xC3 0x01 + 8‑byte
little‑endian CRC‑64‑AVRO fingerprint of the writer schema + Avro binary body.
See the Avro 1.11.1 spec for details. https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding
Build and use a Decoder for Confluent Registry messages:
use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm};
use arrow_avro::reader::ReaderBuilder;
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
store.set(Fingerprint::Id(1234), AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;
// --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---
let mut decoder = ReaderBuilder::new()
.with_writer_schema_store(store)
.build_decoder()?;
let batch = decoder.flush()?.expect("two rows");
assert_eq!(batch.num_rows(), 2);Fields§
§active_decoder: RecordDecoder§active_fingerprint: Option<Fingerprint>§batch_size: usize§remaining_capacity: usize§cache: IndexMap<Fingerprint, RecordDecoder>§fingerprint_algorithm: FingerprintAlgorithm§pending_schema: Option<(Fingerprint, RecordDecoder)>§awaiting_body: boolImplementations§
Source§impl Decoder
impl Decoder
Sourcepub fn schema(&self) -> SchemaRef
pub fn schema(&self) -> SchemaRef
Returns the Arrow schema for the rows decoded by this decoder.
Note: With single‑object or Confluent framing, the schema may change at a row boundary when the input indicates a new fingerprint.
Sourcepub fn batch_size(&self) -> usize
pub fn batch_size(&self) -> usize
Returns the configured maximum number of rows per batch.
Sourcepub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError>
pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError>
Feed a chunk of bytes into the decoder.
This will:
- Decode at most
Self::batch_sizerows; - Return the number of input bytes consumed from
data(which may be 0 if more bytes are required, or less thandata.len()if a prefix/body straddles the chunk boundary); - Defer producing a
RecordBatchuntil you callSelf::flush.
§Returns
The number of bytes consumed from data.
§Errors
Returns an error if:
- The input indicates an unknown fingerprint (not present in the provided
SchemaStore; - The Avro body is malformed;
- A strict‑mode union rule is violated (see
ReaderBuilder::with_strict_mode).
fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>, ArrowError>
Sourcefn handle_prefix_common<const MAGIC_LEN: usize, const N: usize>(
&mut self,
buf: &[u8],
magic: &[u8; MAGIC_LEN],
fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
) -> Result<Option<usize>, ArrowError>
fn handle_prefix_common<const MAGIC_LEN: usize, const N: usize>( &mut self, buf: &[u8], magic: &[u8; MAGIC_LEN], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>
This method checks for the provided magic bytes at the start of buf and, if present,
attempts to read the following fingerprint of N bytes, converting it to a
Fingerprint using fingerprint_from.
fn handle_fingerprint<const N: usize>( &mut self, buf: &[u8], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>
fn apply_pending_schema(&mut self)
fn apply_pending_schema_if_batch_empty(&mut self)
fn flush_and_reset(&mut self) -> Result<Option<RecordBatch>, ArrowError>
Sourcepub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
Produce a RecordBatch if at least one row is fully decoded, returning
Ok(None) if no new rows are available.
If a schema change was detected while decoding rows for the current batch, the schema switch is applied after flushing this batch, so the next batch (if any) may have a different schema.
Sourcepub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
Returns the number of rows that can be added to this decoder before it is full.
Sourcepub fn batch_is_full(&self) -> bool
pub fn batch_is_full(&self) -> bool
Returns true if the decoder has reached its capacity for the current batch.
Sourcepub fn batch_is_empty(&self) -> bool
pub fn batch_is_empty(&self) -> bool
Returns true if the decoder has not decoded any batches yet (i.e., the current batch is empty).