Crate arrow_avro

Crate arrow_avro 

Source
Expand description

Convert data to / from the Apache Arrow memory format and Apache Avro.

This crate provides:

  • a reader that decodes Avro (Object Container Files, Avro Single‑Object encoding, and Confluent Schema Registry wire format) into Arrow RecordBatches,
  • and a writer that encodes Arrow RecordBatches into Avro (OCF or SOE).

If you’re new to Arrow or Avro, see:

§Example: OCF (Object Container File) round‑trip (runnable)

The example below creates an Arrow table, writes an Avro OCF fully in memory, and then reads it back. OCF is a self‑describing file format that embeds the Avro schema in a header with optional compression and block sync markers. Spec: https://avro.apache.org/docs/1.11.1/specification/#object-container-files

use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;

// Build a tiny Arrow batch
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let batch = RecordBatch::try_new(
    Arc::new(schema.clone()),
    vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
)?;

// Write an Avro **Object Container File** (OCF) to a Vec<u8>
let sink: Vec<u8> = Vec::new();
let mut w = AvroWriter::new(sink, schema.clone())?;
w.write(&batch)?;
w.finish()?;
let bytes = w.into_inner();
assert!(!bytes.is_empty());

// Read it back
let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
let out = r.next().unwrap()?;
assert_eq!(out.num_rows(), 3);

§Quickstart: SOE (Single‑Object Encoding) round‑trip (runnable)

Avro Single‑Object Encoding (SOE) wraps an Avro body with a 2‑byte marker 0xC3 0x01 and an 8‑byte little‑endian CRC‑64‑AVRO Rabin fingerprint of the writer schema, then the Avro body. Spec: https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding

This example registers the writer schema (computing a Rabin fingerprint), writes a single‑row Avro body (using AvroStreamWriter), constructs the SOE frame, and decodes it back to Arrow.

use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::{AvroSchema, SchemaStore, FingerprintStrategy, SCHEMA_METADATA_KEY};

// Writer schema: { "type":"record","name":"User","fields":[{"name":"x","type":"long"}] }
let writer_json = r#"{"type":"record","name":"User","fields":[{"name":"x","type":"long"}]}"#;
let mut store = SchemaStore::new(); // Rabin CRC‑64‑AVRO by default
let _fp = store.register(AvroSchema::new(writer_json.to_string()))?;

// Build an Arrow schema that references the same Avro JSON
let mut md = HashMap::new();
md.insert(SCHEMA_METADATA_KEY.to_string(), writer_json.to_string());
let schema = Schema::new_with_metadata(
    vec![Field::new("x", DataType::Int64, false)],
    md,
);

// One‑row batch: { x: 7 }
let batch = RecordBatch::try_new(
    Arc::new(schema.clone()),
    vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef],
)?;

// Stream‑write a single record; the writer adds **SOE** (C3 01 + Rabin) automatically.
let sink: Vec<u8> = Vec::new();
let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
    .with_fingerprint_strategy(FingerprintStrategy::Rabin)
    .build(sink)?;
w.write(&batch)?;
w.finish()?;
let frame = w.into_inner(); // already: C3 01 + 8B LE Rabin + Avro body
assert!(frame.len() > 10);

// Decode
let mut dec = ReaderBuilder::new()
  .with_writer_schema_store(store)
  .build_decoder()?;
dec.decode(&frame)?;
let out = dec.flush()?.expect("one row");
assert_eq!(out.num_rows(), 1);

§Modules

  • reader: read Avro (OCF, SOE, Confluent) into Arrow RecordBatches.
  • writer: write Arrow RecordBatches as Avro (OCF, SOE, Confluent, Apicurio).
  • schema: Avro schema parsing / fingerprints / registries.
  • compression: codecs used for OCF block compression (i.e., Deflate, Snappy, Zstandard, BZip2, and XZ).
  • codec: internal Avro-Arrow type conversion and row decode/encode plans.

§Features

OCF compression (enabled by default)

  • deflate — enable DEFLATE block compression (via flate2).
  • snappy — enable Snappy block compression with 4‑byte BE CRC32 (per Avro).
  • zstd — enable Zstandard block compression.
  • bzip2 — enable BZip2 block compression.
  • xz — enable XZ/LZMA block compression.

Schema fingerprints & helpers (opt‑in)

  • md5 — enable MD5 writer‑schema fingerprints.
  • sha256 — enable SHA‑256 writer‑schema fingerprints.
  • small_decimals — support for compact Arrow representations of small Avro decimals (Decimal32 and Decimal64).
  • avro_custom_types — interpret Avro fields annotated with Arrow‑specific logical types such as arrow.duration-nanos, arrow.duration-micros, arrow.duration-millis, or arrow.duration-seconds as Arrow Duration(TimeUnit).
  • canonical_extension_types — enable support for Arrow canonical extension types from arrow-schema so arrow-avro can respect them during Avro↔Arrow mapping.

Notes

  • OCF compression codecs apply only to Object Container Files; they do not affect Avro single object encodings.

Modules§

codec
Data type conversions between Avro and Arrow types
compression
Compression codec implementations for Avro
reader
Core functionality for reading Avro data into Arrow arrays
schema
Avro schema parsing and representation
writer
Core functionality for writing Arrow arrays as Avro data

Traits§

AvroFieldExt
Extension trait for AvroField to add Utf8View support