Expand description
Convert data to / from the Apache Arrow memory format and Apache Avro.
This crate provides:
- a
readerthat decodes Avro (Object Container Files, Avro Single‑Object encoding, and Confluent Schema Registry wire format) into ArrowRecordBatches, - and a
writerthat encodes ArrowRecordBatches into Avro (OCF or SOE).
If you’re new to Arrow or Avro, see:
- Arrow project site: https://arrow.apache.org/
- Avro 1.11.1 specification: https://avro.apache.org/docs/1.11.1/specification/
§Example: OCF (Object Container File) round‑trip (runnable)
The example below creates an Arrow table, writes an Avro OCF fully in memory, and then reads it back. OCF is a self‑describing file format that embeds the Avro schema in a header with optional compression and block sync markers. Spec: https://avro.apache.org/docs/1.11.1/specification/#object-container-files
use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;
// Build a tiny Arrow batch
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
)?;
// Write an Avro **Object Container File** (OCF) to a Vec<u8>
let sink: Vec<u8> = Vec::new();
let mut w = AvroWriter::new(sink, schema.clone())?;
w.write(&batch)?;
w.finish()?;
let bytes = w.into_inner();
assert!(!bytes.is_empty());
// Read it back
let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
let out = r.next().unwrap()?;
assert_eq!(out.num_rows(), 3);§Quickstart: SOE (Single‑Object Encoding) round‑trip (runnable)
Avro Single‑Object Encoding (SOE) wraps an Avro body with a 2‑byte marker
0xC3 0x01 and an 8‑byte little‑endian CRC‑64‑AVRO Rabin fingerprint of the
writer schema, then the Avro body. Spec:
https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding
This example registers the writer schema (computing a Rabin fingerprint), writes a
single‑row Avro body (using AvroStreamWriter), constructs the SOE frame, and decodes it back to Arrow.
use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::{AvroSchema, SchemaStore, FingerprintStrategy, SCHEMA_METADATA_KEY};
// Writer schema: { "type":"record","name":"User","fields":[{"name":"x","type":"long"}] }
let writer_json = r#"{"type":"record","name":"User","fields":[{"name":"x","type":"long"}]}"#;
let mut store = SchemaStore::new(); // Rabin CRC‑64‑AVRO by default
let _fp = store.register(AvroSchema::new(writer_json.to_string()))?;
// Build an Arrow schema that references the same Avro JSON
let mut md = HashMap::new();
md.insert(SCHEMA_METADATA_KEY.to_string(), writer_json.to_string());
let schema = Schema::new_with_metadata(
vec![Field::new("x", DataType::Int64, false)],
md,
);
// One‑row batch: { x: 7 }
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef],
)?;
// Stream‑write a single record; the writer adds **SOE** (C3 01 + Rabin) automatically.
let sink: Vec<u8> = Vec::new();
let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
.with_fingerprint_strategy(FingerprintStrategy::Rabin)
.build(sink)?;
w.write(&batch)?;
w.finish()?;
let frame = w.into_inner(); // already: C3 01 + 8B LE Rabin + Avro body
assert!(frame.len() > 10);
// Decode
let mut dec = ReaderBuilder::new()
.with_writer_schema_store(store)
.build_decoder()?;
dec.decode(&frame)?;
let out = dec.flush()?.expect("one row");
assert_eq!(out.num_rows(), 1);§async Reading (async feature)
The reader module provides async APIs for reading Avro files when the async
feature is enabled.
AsyncAvroFileReader implements Stream<Item = Result<RecordBatch, ArrowError>>,
allowing efficient async streaming of record batches. When the object_store feature
is enabled, AvroObjectReader provides integration with object storage services
such as S3 via the object_store crate.
use std::sync::Arc;
use arrow_avro::reader::{AsyncAvroFileReader, AvroObjectReader};
use futures::TryStreamExt;
use object_store::ObjectStore;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
let path = Path::from("data/example.avro");
let meta = store.head(&path).await?;
let reader = AvroObjectReader::new(store, path);
let stream = AsyncAvroFileReader::builder(reader, meta.size, 1024)
.try_build()
.await?;
let batches: Vec<_> = stream.try_collect().await?;§Modules
reader: read Avro (OCF, SOE, Confluent) into ArrowRecordBatches.- With the
asyncfeature:AsyncAvroFileReaderfor async streaming reads. - With the
object_storefeature:AvroObjectReaderfor reading from cloud storage.
- With the
writer: write ArrowRecordBatches as Avro (OCF, SOE, Confluent, Apicurio).schema: Avro schema parsing / fingerprints / registries.compression: codecs used for OCF block compression (i.e., Deflate, Snappy, Zstandard, BZip2, and XZ).codec: internal Avro-Arrow type conversion and row decode/encode plans.
§Features
OCF compression (enabled by default)
deflate— enable DEFLATE block compression (viaflate2).snappy— enable Snappy block compression with 4‑byte BE CRC32 (per Avro).zstd— enable Zstandard block compression.bzip2— enable BZip2 block compression.xz— enable XZ/LZMA block compression.
Async & Object Store (opt‑in)
async— enable async APIs for reading Avro (AsyncAvroFileReader,AsyncFileReadertrait).object_store— enable integration with theobject_storecrate for reading Avro from cloud storage (S3, GCS, Azure Blob, etc.) viaAvroObjectReader. Impliesasync.
Schema fingerprints & helpers (opt‑in)
md5— enable MD5 writer‑schema fingerprints.sha256— enable SHA‑256 writer‑schema fingerprints.small_decimals— support for compact Arrow representations of small Avro decimals (Decimal32andDecimal64).avro_custom_types— interpret Avro fields annotated with Arrow‑specific logical types such asarrow.duration-nanos,arrow.duration-micros,arrow.duration-millis, orarrow.duration-secondsas ArrowDuration(TimeUnit).canonical_extension_types— enable support for Arrow canonical extension types fromarrow-schemasoarrow-avrocan respect them during Avro↔Arrow mapping.
Notes
- OCF compression codecs apply only to Object Container Files; they do not affect Avro single object encodings.
Modules§
- codec
- Data type conversions between Avro and Arrow types
- compression
- Compression codec implementations for Avro
- errors
- AvroError variants Common Avro errors and macros.
- reader
- Core functionality for reading Avro data into Arrow arrays
- schema
- Avro schema parsing and representation
- writer
- Core functionality for writing Arrow arrays as Avro data
Traits§
- Avro
Field Ext - Extension trait for AvroField to add Utf8View support