arrow_avro/lib.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Convert data to / from the [Apache Arrow] memory format and [Apache Avro].
19//!
20//! This crate provides:
21//! - a [`reader`] that decodes Avro (Object Container Files, Avro Single‑Object encoding,
22//! and Confluent Schema Registry wire format) into Arrow `RecordBatch`es,
23//! - and a [`writer`] that encodes Arrow `RecordBatch`es into Avro (OCF or raw Avro binary).
24//!
25//! If you’re new to Arrow or Avro, see:
26//! - Arrow project site: <https://arrow.apache.org/>
27//! - Avro 1.11.1 specification: <https://avro.apache.org/docs/1.11.1/specification/>
28//!
29//! ## Example: OCF (Object Container File) round‑trip
30//!
31//! The example below creates an Arrow table, writes an **Avro OCF** fully in memory,
32//! and then reads it back. OCF is a self‑describing file format that embeds the Avro
33//! schema in a header with optional compression and block sync markers.
34//! Spec: <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
35//!
36//! ```
37//! use std::io::Cursor;
38//! use std::sync::Arc;
39//! use arrow_array::{ArrayRef, Int32Array, RecordBatch};
40//! use arrow_schema::{DataType, Field, Schema};
41//! use arrow_avro::writer::AvroWriter;
42//! use arrow_avro::reader::ReaderBuilder;
43//!
44//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
45//! // Build a tiny Arrow batch
46//! let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
47//! let batch = RecordBatch::try_new(
48//! Arc::new(schema.clone()),
49//! vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
50//! )?;
51//!
52//! // Write an Avro **Object Container File** (OCF) to a Vec<u8>
53//! let sink: Vec<u8> = Vec::new();
54//! let mut w = AvroWriter::new(sink, schema.clone())?;
55//! w.write(&batch)?;
56//! w.finish()?;
57//! let bytes = w.into_inner();
58//! assert!(!bytes.is_empty());
59//!
60//! // Read it back
61//! let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
62//! let out = r.next().unwrap()?;
63//! assert_eq!(out.num_rows(), 3);
64//! # Ok(()) }
65//! ```
66//!
67//! ## Quickstart: Confluent wire‑format round‑trip *(runnable)*
68//!
69//! The **Confluent Schema Registry wire format** prefixes each Avro message with a
70//! 1‑byte magic `0x00` and a **4‑byte big‑endian** schema ID, followed by the Avro body.
71//! See: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
72//!
73//! In this round‑trip, we:
74//! 1) Use `AvroStreamWriter` to create a **raw Avro body** for a single‑row batch,
75//! 2) Wrap it with the Confluent prefix (magic and schema ID),
76//! 3) Decode it back to Arrow using a `Decoder` configured with a `SchemaStore` that
77//! maps the schema ID to the Avro schema used by the writer.
78//!
79//! ```
80//! use std::collections::HashMap;
81//! use std::sync::Arc;
82//! use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
83//! use arrow_schema::{DataType, Field, Schema};
84//! use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
85//! use arrow_avro::reader::ReaderBuilder;
86//! use arrow_avro::schema::{
87//! AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm,
88//! FingerprintStrategy, SCHEMA_METADATA_KEY
89//! };
90//!
91//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
92//! // Writer schema registered under Schema Registry ID 1
93//! let avro_json = r#"{
94//! "type":"record","name":"User",
95//! "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]
96//! }"#;
97//!
98//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
99//! let id: u32 = 1;
100//! store.set(Fingerprint::Id(id), AvroSchema::new(avro_json.to_string()))?;
101//!
102//! // Build an Arrow schema that references the same Avro JSON
103//! let mut md = HashMap::new();
104//! md.insert(SCHEMA_METADATA_KEY.to_string(), avro_json.to_string());
105//! let schema = Schema::new_with_metadata(
106//! vec![
107//! Field::new("id", DataType::Int64, false),
108//! Field::new("name", DataType::Utf8, false),
109//! ],
110//! md,
111//! );
112//!
113//! // One‑row batch: { id: 42, name: "alice" }
114//! let batch = RecordBatch::try_new(
115//! Arc::new(schema.clone()),
116//! vec![
117//! Arc::new(Int64Array::from(vec![42])) as ArrayRef,
118//! Arc::new(StringArray::from(vec!["alice"])) as ArrayRef,
119//! ],
120//! )?;
121//!
122//! // Stream‑write a single record, letting the writer add the **Confluent** prefix.
123//! let sink: Vec<u8> = Vec::new();
124//! let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
125//! .with_fingerprint_strategy(FingerprintStrategy::Id(id))
126//! .build(sink)?;
127//! w.write(&batch)?;
128//! w.finish()?;
129//! let frame = w.into_inner(); // already: 0x00 + 4B BE ID + Avro body
130//! assert!(frame.len() > 5);
131//!
132//! // Decode
133//! let mut dec = ReaderBuilder::new()
134//! .with_writer_schema_store(store)
135//! .build_decoder()?;
136//! dec.decode(&frame)?;
137//! let out = dec.flush()?.expect("one row");
138//! assert_eq!(out.num_rows(), 1);
139//! # Ok(()) }
140//! ```
141//!
142//! ## Quickstart: Avro Single‑Object Encoding round‑trip *(runnable)*
143//!
144//! Avro **Single‑Object Encoding (SOE)** wraps an Avro body with a 2‑byte marker
145//! `0xC3 0x01` and an **8‑byte little‑endian CRC‑64‑AVRO Rabin fingerprint** of the
146//! writer schema, then the Avro body. Spec:
147//! <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
148//!
149//! This example registers the writer schema (computing a Rabin fingerprint), writes a
150//! single‑row Avro body (using `AvroStreamWriter`), constructs the SOE frame, and decodes it back to Arrow.
151//!
152//! ```
153//! use std::collections::HashMap;
154//! use std::sync::Arc;
155//! use arrow_array::{ArrayRef, Int64Array, RecordBatch};
156//! use arrow_schema::{DataType, Field, Schema};
157//! use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
158//! use arrow_avro::reader::ReaderBuilder;
159//! use arrow_avro::schema::{AvroSchema, SchemaStore, FingerprintStrategy, SCHEMA_METADATA_KEY};
160//!
161//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
162//! // Writer schema: { "type":"record","name":"User","fields":[{"name":"x","type":"long"}] }
163//! let writer_json = r#"{"type":"record","name":"User","fields":[{"name":"x","type":"long"}]}"#;
164//! let mut store = SchemaStore::new(); // Rabin CRC‑64‑AVRO by default
165//! let _fp = store.register(AvroSchema::new(writer_json.to_string()))?;
166//!
167//! // Build an Arrow schema that references the same Avro JSON
168//! let mut md = HashMap::new();
169//! md.insert(SCHEMA_METADATA_KEY.to_string(), writer_json.to_string());
170//! let schema = Schema::new_with_metadata(
171//! vec![Field::new("x", DataType::Int64, false)],
172//! md,
173//! );
174//!
175//! // One‑row batch: { x: 7 }
176//! let batch = RecordBatch::try_new(
177//! Arc::new(schema.clone()),
178//! vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef],
179//! )?;
180//!
181//! // Stream‑write a single record; the writer adds **SOE** (C3 01 + Rabin) automatically.
182//! let sink: Vec<u8> = Vec::new();
183//! let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
184//! .with_fingerprint_strategy(FingerprintStrategy::Rabin)
185//! .build(sink)?;
186//! w.write(&batch)?;
187//! w.finish()?;
188//! let frame = w.into_inner(); // already: C3 01 + 8B LE Rabin + Avro body
189//! assert!(frame.len() > 10);
190//!
191//! // Decode
192//! let mut dec = ReaderBuilder::new()
193//! .with_writer_schema_store(store)
194//! .build_decoder()?;
195//! dec.decode(&frame)?;
196//! let out = dec.flush()?.expect("one row");
197//! assert_eq!(out.num_rows(), 1);
198//! # Ok(()) }
199//! ```
200//!
201//! ---
202//!
203//! ### Modules
204//!
205//! - [`reader`]: read Avro (OCF, SOE, Confluent) into Arrow `RecordBatch`es.
206//! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent).
207//! - [`schema`]: Avro schema parsing / fingerprints / registries.
208//! - [`compression`]: codecs used for **OCF block compression** (i.e., Deflate, Snappy, Zstandard, BZip2, and XZ).
209//! - [`codec`]: internal Avro-Arrow type conversion and row decode/encode plans.
210//!
211//! ### Features
212//!
213//! **OCF compression (enabled by default)**
214//! - `deflate` — enable DEFLATE block compression (via `flate2`).
215//! - `snappy` — enable Snappy block compression with 4‑byte BE CRC32 (per Avro).
216//! - `zstd` — enable Zstandard block compression.
217//! - `bzip2` — enable BZip2 block compression.
218//! - `xz` — enable XZ/LZMA block compression.
219//!
220//! **Schema fingerprints & helpers (opt‑in)**
221//! - `md5` — enable MD5 writer‑schema fingerprints.
222//! - `sha256` — enable SHA‑256 writer‑schema fingerprints.
223//! - `small_decimals` — support for compact Arrow representations of small Avro decimals (`Decimal32` and `Decimal64`).
224//! - `avro_custom_types` — interpret Avro fields annotated with Arrow‑specific logical
225//! types such as `arrow.duration-nanos`, `arrow.duration-micros`,
226//! `arrow.duration-millis`, or `arrow.duration-seconds` as Arrow `Duration(TimeUnit)`.
227//! - `canonical_extension_types` — enable support for Arrow [canonical extension types]
228//! from `arrow-schema` so `arrow-avro` can respect them during Avro↔Arrow mapping.
229//!
230//! **Notes**
231//! - OCF compression codecs apply only to **Object Container Files**; they do not affect Avro
232//! single object encodings.
233//!
234//! [canonical extension types]: https://arrow.apache.org/docs/format/CanonicalExtensions.html
235//!
236//! [Apache Arrow]: https://arrow.apache.org/
237//! [Apache Avro]: https://avro.apache.org/
238
239#![doc(
240 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
241 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
242)]
243#![cfg_attr(docsrs, feature(doc_cfg))]
244#![warn(missing_docs)]
245
246/// Core functionality for reading Avro data into Arrow arrays
247///
248/// Implements the primary reader interface and record decoding logic.
249pub mod reader;
250
251/// Core functionality for writing Arrow arrays as Avro data
252///
253/// Implements the primary writer interface and record encoding logic.
254pub mod writer;
255
256/// Avro schema parsing and representation
257///
258/// Provides types for parsing and representing Avro schema definitions.
259pub mod schema;
260
261/// Compression codec implementations for Avro
262///
263/// Provides support for various compression algorithms used in Avro files,
264/// including Deflate, Snappy, and ZStandard.
265pub mod compression;
266
267/// Data type conversions between Avro and Arrow types
268///
269/// This module contains the necessary types and functions to convert between
270/// Avro data types and Arrow data types.
271pub mod codec;
272
273/// Extension trait for AvroField to add Utf8View support
274///
275/// This trait adds methods for working with Utf8View support to the AvroField struct.
276pub trait AvroFieldExt {
277 /// Returns a new field with Utf8View support enabled for string data
278 ///
279 /// This will convert any string data to use StringViewArray instead of StringArray.
280 fn with_utf8view(&self) -> Self;
281}
282
283impl AvroFieldExt for codec::AvroField {
284 fn with_utf8view(&self) -> Self {
285 codec::AvroField::with_utf8view(self)
286 }
287}
288
289#[cfg(test)]
290mod test_util {
291 pub fn arrow_test_data(path: &str) -> String {
292 match std::env::var("ARROW_TEST_DATA") {
293 Ok(dir) => format!("{dir}/{path}"),
294 Err(_) => format!("../testing/data/{path}"),
295 }
296 }
297}