Module reader

Module reader 

Source
Expand description

Core functionality for reading Avro data into Arrow arrays

Implements the primary reader interface and record decoding logic. Avro reader

Facilities to read Apache Avro–encoded data into Arrow’s RecordBatch format.

§Limitations

  • Avro unions with > 127 branches are not supported. When decoding Avro unions to Arrow UnionArray, Arrow stores the union type identifiers in an 8‑bit signed buffer (i8). This implies a practical limit of 127 distinct branch ids. Inputs that resolve to more than 127 branches will return an error. If you truly need more, model the schema as a union of unions, per the Arrow format spec.

    See: Arrow Columnar Format — Dense Union (“types buffer: 8‑bit signed; a union with more than 127 possible types can be modeled as a union of unions”).

This module exposes three layers of the API surface, from highest to lowest-level:

  • ReaderBuilder: configures how Avro is read (batch size, strict union handling, string representation, reader schema, etc.) and produces either:
    • a Reader for Avro Object Container Files (OCF) read from any BufRead, or
    • a low-level Decoder for single‑object encoded Avro bytes and Confluent Schema Registry framed messages.
  • Reader: a convenient, synchronous iterator over RecordBatch decoded from an OCF input. Implements Iterator<Item = Result<RecordBatch, ArrowError>> and RecordBatchReader.
  • Decoder: a push‑based row decoder that consumes raw Avro bytes and yields ready RecordBatch values when batches fill. This is suitable for integrating with async byte streams, network protocols, or other custom data sources.

§Encodings and when to use which type

§Basic file usage (OCF)

Use ReaderBuilder::build to construct a Reader from any BufRead. The doctest below creates a tiny OCF in memory using AvroWriter and then reads it back.

use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;

// Build a minimal Arrow schema and batch
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let batch = RecordBatch::try_new(
    Arc::new(schema.clone()),
    vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
)?;

// Write an Avro OCF to memory
let buffer: Vec<u8> = Vec::new();
let mut writer = AvroWriter::new(buffer, schema.clone())?;
writer.write(&batch)?;
writer.finish()?;
let bytes = writer.into_inner();

// Read it back with ReaderBuilder
let mut reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
let out = reader.next().unwrap()?;
assert_eq!(out.num_rows(), 3);

§Streaming usage (single‑object / Confluent)

The Decoder lets you integrate Avro decoding with any source of bytes by periodically calling Decoder::decode with new data and calling Decoder::flush to get a RecordBatch once at least one row is complete.

The example below shows how to decode from an arbitrary stream of bytes::Bytes using futures utilities. Note: this is illustrative and keeps a single in‑memory Bytes buffer for simplicity—real applications typically maintain a rolling buffer.

use bytes::{Buf, Bytes};
use futures::{Stream, StreamExt};
use std::task::{Poll, ready};
use arrow_array::RecordBatch;
use arrow_schema::ArrowError;
use arrow_avro::reader::Decoder;

/// Decode a stream of Avro-framed bytes into RecordBatch values.
fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
    mut decoder: Decoder,
    mut input: S,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
    let mut buffered = Bytes::new();
    futures::stream::poll_fn(move |cx| {
        loop {
            if buffered.is_empty() {
                buffered = match ready!(input.poll_next_unpin(cx)) {
                    Some(b) => b,
                    None => break, // EOF
                };
            }
            // Feed as much as possible
            let decoded = match decoder.decode(buffered.as_ref()) {
                Ok(n) => n,
                Err(e) => return Poll::Ready(Some(Err(e))),
            };
            let read = buffered.len();
            buffered.advance(decoded);
            if decoded != read {
                // decoder made partial progress; request more bytes
                break
            }
        }
        // Return a batch if one or more rows are complete
        Poll::Ready(decoder.flush().transpose())
    })
}

§Building and using a Decoder for single‑object encoding (Rabin fingerprints)

The doctest below writes a single‑object framed record using the Avro writer (no manual varints) for the writer schema ({"type":"record","name":"User","fields":[{"name":"id","type":"long"}]}) and then decodes it into a RecordBatch.

use std::sync::Arc;
use std::collections::HashMap;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::schema::{AvroSchema, SchemaStore, SCHEMA_METADATA_KEY, FingerprintStrategy};
use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
use arrow_avro::reader::ReaderBuilder;

// Register the writer schema (Rabin fingerprint by default).
let mut store = SchemaStore::new();
let avro_schema = AvroSchema::new(r#"{"type":"record","name":"User","fields":[
  {"name":"id","type":"long"}]}"#.to_string());
let _fp = store.register(avro_schema.clone())?;

// Create a single-object framed record { id: 42 } with the Avro writer.
let mut md = HashMap::new();
md.insert(SCHEMA_METADATA_KEY.to_string(), avro_schema.json_string.clone());
let arrow = Schema::new_with_metadata(vec![Field::new("id", DataType::Int64, false)], md);
let batch = RecordBatch::try_new(
    Arc::new(arrow.clone()),
    vec![Arc::new(Int64Array::from(vec![42])) as ArrayRef],
)?;
let mut w = WriterBuilder::new(arrow)
    .with_fingerprint_strategy(FingerprintStrategy::Rabin) // SOE prefix
    .build::<_, AvroBinaryFormat>(Vec::new())?;
w.write(&batch)?;
w.finish()?;
let frame = w.into_inner(); // C3 01 + fp + Avro body

// Decode with a `Decoder`
let mut dec = ReaderBuilder::new()
  .with_writer_schema_store(store)
  .with_batch_size(1024)
  .build_decoder()?;

dec.decode(&frame)?;
let out = dec.flush()?.expect("one batch");
assert_eq!(out.num_rows(), 1);

See Avro 1.11.1 “Single object encoding” for details of the 2‑byte marker and little‑endian CRC‑64‑AVRO fingerprint: https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding

§Building and using a Decoder for Confluent Schema Registry framing

The Confluent wire format is: 1‑byte magic 0x00, then a 4‑byte big‑endian schema ID, then the Avro body. The doctest below crafts two messages for the same schema ID and decodes them into a single RecordBatch with two rows.

use std::sync::Arc;
use std::collections::HashMap;
use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm, SCHEMA_METADATA_KEY, FingerprintStrategy};
use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
use arrow_avro::reader::ReaderBuilder;

// Set up a store keyed by numeric IDs (Confluent).
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
let schema_id = 7u32;
let avro_schema = AvroSchema::new(r#"{"type":"record","name":"User","fields":[
  {"name":"id","type":"long"}, {"name":"name","type":"string"}]}"#.to_string());
store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;

// Write two Confluent-framed messages {id:1,name:"a"} and {id:2,name:"b"}.
fn msg(id: i64, name: &str, schema: &AvroSchema, schema_id: u32) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
    let mut md = HashMap::new();
    md.insert(SCHEMA_METADATA_KEY.to_string(), schema.json_string.clone());
    let arrow = Schema::new_with_metadata(
        vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)],
        md,
    );
    let batch = RecordBatch::try_new(
        Arc::new(arrow.clone()),
        vec![
          Arc::new(Int64Array::from(vec![id])) as ArrayRef,
          Arc::new(StringArray::from(vec![name])) as ArrayRef,
        ],
    )?;
    let mut w = WriterBuilder::new(arrow)
        .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) // 0x00 + ID + body
        .build::<_, AvroBinaryFormat>(Vec::new())?;
    w.write(&batch)?; w.finish()?;
    Ok(w.into_inner())
}
let m1 = msg(1, "a", &avro_schema, schema_id)?;
let m2 = msg(2, "b", &avro_schema, schema_id)?;

// Decode both into a single batch.
let mut dec = ReaderBuilder::new()
  .with_writer_schema_store(store)
  .with_batch_size(1024)
  .build_decoder()?;
dec.decode(&m1)?;
dec.decode(&m2)?;
let batch = dec.flush()?.expect("batch");
assert_eq!(batch.num_rows(), 2);

See Confluent’s “Wire format” notes: magic byte 0x00, 4‑byte big‑endian schema ID, then the Avro‑encoded payload. https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

§Schema resolution (reader vs. writer schemas)

Avro supports resolving data written with one schema (“writer”) into another (“reader”) using rules like field aliases, default values, and numeric promotions. In practice this lets you evolve schemas over time while remaining compatible with old data.

Spec background: See Avro’s Schema Resolution (aliases, defaults) and the Confluent Wire format (magic 0x00 + big‑endian schema id + Avro body). https://avro.apache.org/docs/1.11.1/specification/#schema-resolution https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

§OCF example: rename a field and add a default via a reader schema

Below we write an OCF with a writer schema having fields id: long, name: string. We then read it with a reader schema that:

  • renames name to full_name via aliases, and
  • adds is_active: boolean with a default value true.
use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::AvroSchema;

// Writer (past version): { id: long, name: string }
let writer_arrow = Schema::new(vec![
    Field::new("id", DataType::Int64, false),
    Field::new("name", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
    Arc::new(writer_arrow.clone()),
    vec![
        Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
        Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
    ],
)?;

// Write an OCF entirely in memory
let mut w = AvroWriter::new(Vec::<u8>::new(), writer_arrow)?;
w.write(&batch)?;
w.finish()?;
let bytes = w.into_inner();

// Reader (current version):
//  - record name "topLevelRecord" matches the crate's default for OCF
//  - rename `name` -> `full_name` using aliases (optional)
let reader_json = r#"
{
  "type": "record",
  "name": "topLevelRecord",
  "fields": [
    { "name": "id", "type": "long" },
    { "name": "full_name", "type": ["null","string"], "aliases": ["name"], "default": null },
    { "name": "is_active", "type": "boolean", "default": true }
  ]
}"#;

let mut reader = ReaderBuilder::new()
  .with_reader_schema(AvroSchema::new(reader_json.to_string()))
  .build(Cursor::new(bytes))?;

let out = reader.next().unwrap()?;
assert_eq!(out.num_rows(), 2);

§Confluent single‑object example: resolve past writer versions to the topic’s current reader schema

In this scenario, the reader schema is the topic’s current schema, while the two writer schemas registered under Confluent IDs 1 and 2 represent past versions. The decoder uses the reader schema to resolve both versions.

use std::sync::Arc;
use std::collections::HashMap;
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::{
    AvroSchema, Fingerprint, FingerprintAlgorithm, SchemaStore,
    SCHEMA_METADATA_KEY, FingerprintStrategy,
};
use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Reader: current topic schema (no reader-added fields)
    //   {"type":"record","name":"User","fields":[
    //     {"name":"id","type":"long"},
    //     {"name":"name","type":"string"}]}
    let reader_schema = AvroSchema::new(
        r#"{"type":"record","name":"User",
            "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#
            .to_string(),
    );

    // Register two *writer* schemas under Confluent IDs 0 and 1
    let writer_v0 = AvroSchema::new(
        r#"{"type":"record","name":"User",
            "fields":[{"name":"id","type":"int"},{"name":"name","type":"string"}]}"#
            .to_string(),
    );
    let writer_v1 = AvroSchema::new(
        r#"{"type":"record","name":"User",
            "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"},
                      {"name":"email","type":["null","string"],"default":null}]}"#
            .to_string(),
    );

    let id_v0: u32 = 0;
    let id_v1: u32 = 1;

    let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None); // integer IDs
    store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
    store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;

    // Write two Confluent-framed messages using each writer version
    // frame0: writer v0 body {id:1001_i32, name:"v0-alice"}
    let mut md0 = HashMap::new();
    md0.insert(SCHEMA_METADATA_KEY.to_string(), writer_v0.json_string.clone());
    let arrow0 = Schema::new_with_metadata(
        vec![Field::new("id", DataType::Int32, false),
             Field::new("name", DataType::Utf8, false)], md0);
    let batch0 = RecordBatch::try_new(
        Arc::new(arrow0.clone()),
        vec![Arc::new(Int32Array::from(vec![1001])) as ArrayRef,
             Arc::new(StringArray::from(vec!["v0-alice"])) as ArrayRef])?;
    let mut w0 = arrow_avro::writer::WriterBuilder::new(arrow0)
        .with_fingerprint_strategy(FingerprintStrategy::Id(id_v0))
        .build::<_, arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?;
    w0.write(&batch0)?; w0.finish()?;
    let frame0 = w0.into_inner(); // 0x00 + id_v0 + body

    // frame1: writer v1 body {id:2002_i64, name:"v1-bob", email: Some("bob@example.com")}
    let mut md1 = HashMap::new();
   md1.insert(SCHEMA_METADATA_KEY.to_string(), writer_v1.json_string.clone());
    let arrow1 = Schema::new_with_metadata(
        vec![Field::new("id", DataType::Int64, false),
             Field::new("name", DataType::Utf8, false),
             Field::new("email", DataType::Utf8, true)], md1);
    let batch1 = RecordBatch::try_new(
        Arc::new(arrow1.clone()),
        vec![Arc::new(Int64Array::from(vec![2002])) as ArrayRef,
             Arc::new(StringArray::from(vec!["v1-bob"])) as ArrayRef,
             Arc::new(StringArray::from(vec![Some("bob@example.com")])) as ArrayRef])?;
    let mut w1 = arrow_avro::writer::WriterBuilder::new(arrow1)
        .with_fingerprint_strategy(FingerprintStrategy::Id(id_v1))
        .build::<_, arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?;
    w1.write(&batch1)?; w1.finish()?;
    let frame1 = w1.into_inner(); // 0x00 + id_v1 + body

    // Build a streaming Decoder that understands Confluent framing
    let mut decoder = ReaderBuilder::new()
        .with_reader_schema(reader_schema)
        .with_writer_schema_store(store)
        .with_batch_size(8) // small demo batches
        .build_decoder()?;

    // Decode each whole frame, then drain completed rows with flush()
    let mut total_rows = 0usize;

    let consumed0 = decoder.decode(&frame0)?;
    assert_eq!(consumed0, frame0.len(), "decoder must consume the whole frame");
    while let Some(batch) = decoder.flush()? { total_rows += batch.num_rows(); }

    let consumed1 = decoder.decode(&frame1)?;
    assert_eq!(consumed1, frame1.len(), "decoder must consume the whole frame");
    while let Some(batch) = decoder.flush()? { total_rows += batch.num_rows(); }

    // We sent 2 records so we should get 2 rows (possibly one per flush)
    assert_eq!(total_rows, 2);
    Ok(())
}

§Schema evolution and batch boundaries

Decoder supports mid‑stream schema changes when the input framing carries a schema fingerprint (single‑object or Confluent). When a new fingerprint is observed:

  • If the current RecordBatch is empty, the decoder switches to the new schema immediately.
  • If not, the decoder finishes the current batch first and only then switches.

Consequently, the schema of batches produced by Decoder::flush may change over time, and Decoder intentionally does not implement RecordBatchReader. In contrast, Reader (OCF) has a single writer schema for the entire file and therefore implements RecordBatchReader.

§Performance & memory

  • batch_size controls the maximum number of rows per RecordBatch. Larger batches amortize per‑batch overhead; smaller batches reduce peak memory usage and latency.
  • When utf8_view is enabled, string columns use Arrow’s StringViewArray, which can reduce allocations for short strings.
  • For OCF, blocks may be compressed; Reader will decompress using the codec specified in the file header and feed uncompressed bytes to the row Decoder.

§Error handling

  • Incomplete inputs return parse errors with “Unexpected EOF”; callers typically provide more bytes and try again.
  • If a fingerprint is unknown to the provided SchemaStore, decoding fails with a descriptive error. Populate the store up front to avoid this.

Modules§

block 🔒
Decoder for Block
cursor 🔒
header 🔒
Decoder for Header
record 🔒
vlq 🔒

Structs§

Decoder
A low‑level, push‑based decoder from Avro bytes to Arrow RecordBatch.
Reader
A high‑level Avro Object Container File reader.
ReaderBuilder
A builder that configures and constructs Avro readers and decoders.

Functions§

is_incomplete_data 🔒
read_header 🔒
Read the Avro file header (magic, metadata, sync marker) from reader.