Skip to main content

arrow_avro/writer/
format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Writer Formats for Arrow.
19
20use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
21use crate::errors::AvroError;
22use crate::schema::{AvroSchema, AvroSchemaOptions, SCHEMA_METADATA_KEY};
23use crate::writer::encoder::write_long;
24use arrow_schema::Schema;
25use rand::RngCore;
26use std::fmt::Debug;
27use std::io::Write;
28
29/// Format abstraction implemented by each container‐level writer.
30pub trait AvroFormat: Debug + Default {
31    /// If `true`, the writer for this format will query `single_object_prefix()`
32    /// and write the prefix before each record. If `false`, the writer can
33    /// skip this step. This is a performance hint for the writer.
34    const NEEDS_PREFIX: bool;
35
36    /// Write any bytes required at the very beginning of the output stream
37    /// (file header, etc.).
38    /// Implementations **must not** write any record data.
39    fn start_stream<W: Write>(
40        &mut self,
41        writer: &mut W,
42        schema: &Schema,
43        compression: Option<CompressionCodec>,
44    ) -> Result<(), AvroError>;
45
46    /// Return the 16‑byte sync marker (OCF) or `None` (binary stream).
47    fn sync_marker(&self) -> Option<&[u8; 16]>;
48}
49
50/// Avro Object Container File (OCF) format writer.
51#[derive(Debug, Default)]
52pub struct AvroOcfFormat {
53    sync_marker: [u8; 16],
54}
55
56impl AvroFormat for AvroOcfFormat {
57    const NEEDS_PREFIX: bool = false;
58    fn start_stream<W: Write>(
59        &mut self,
60        writer: &mut W,
61        schema: &Schema,
62        compression: Option<CompressionCodec>,
63    ) -> Result<(), AvroError> {
64        let mut rng = rand::rng();
65        rng.fill_bytes(&mut self.sync_marker);
66        // Choose the Avro schema JSON that the file will advertise.
67        // If `schema.metadata[SCHEMA_METADATA_KEY]` exists, AvroSchema::try_from
68        // uses it verbatim; otherwise it is generated from the Arrow schema.
69        let avro_schema = AvroSchema::from_arrow_with_options(
70            schema,
71            Some(AvroSchemaOptions {
72                null_order: None,
73                strip_metadata: true,
74            }),
75        )
76        .map_err(|e| AvroError::SchemaError(format!("{:?}", e)))?;
77        // Magic
78        writer.write_all(b"Obj\x01")?;
79        // File metadata map: { "avro.schema": <json>, "avro.codec": <codec> }
80        let codec_str = match compression {
81            Some(CompressionCodec::Deflate) => "deflate",
82            Some(CompressionCodec::Snappy) => "snappy",
83            Some(CompressionCodec::ZStandard) => "zstandard",
84            Some(CompressionCodec::Bzip2) => "bzip2",
85            Some(CompressionCodec::Xz) => "xz",
86            None => "null",
87        };
88        // Map block: count=2, then key/value pairs, then terminating count=0
89        write_long(writer, 2)?;
90        write_string(writer, SCHEMA_METADATA_KEY)?;
91        write_bytes(writer, avro_schema.json_string.as_bytes())?;
92        write_string(writer, CODEC_METADATA_KEY)?;
93        write_bytes(writer, codec_str.as_bytes())?;
94        write_long(writer, 0)?;
95        // Sync marker (16 bytes)
96        writer.write_all(&self.sync_marker)?;
97        Ok(())
98    }
99
100    fn sync_marker(&self) -> Option<&[u8; 16]> {
101        Some(&self.sync_marker)
102    }
103}
104
105/// Raw Avro binary streaming format using **Single-Object Encoding** per record.
106///
107/// Each record written by the stream writer is framed with a prefix determined
108/// by the schema fingerprinting algorithm.
109///
110/// See: <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
111/// See: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
112#[derive(Debug, Default)]
113pub struct AvroSoeFormat {}
114
115impl AvroFormat for AvroSoeFormat {
116    const NEEDS_PREFIX: bool = true;
117    fn start_stream<W: Write>(
118        &mut self,
119        _writer: &mut W,
120        _schema: &Schema,
121        compression: Option<CompressionCodec>,
122    ) -> Result<(), AvroError> {
123        if compression.is_some() {
124            return Err(AvroError::InvalidArgument(
125                "Compression not supported for Avro SOE streaming".to_string(),
126            ));
127        }
128        Ok(())
129    }
130
131    fn sync_marker(&self) -> Option<&[u8; 16]> {
132        None
133    }
134}
135
136/// Unframed Avro binary streaming format ("raw Avro record body bytes (no prefix, no OCF header)").
137///
138/// Each record written by the stream writer contains only the raw Avro
139/// record body bytes (i.e., the Avro binary encoding of the datum) with **no**
140/// per-record prefix and **no** Object Container File (OCF) header.
141///
142/// This format is useful when another transport provides framing (for example,
143/// length-delimited buffers) or when embedding Avro record payloads inside a
144/// larger envelope.
145#[derive(Debug, Default)]
146pub struct AvroBinaryFormat;
147
148impl AvroFormat for AvroBinaryFormat {
149    const NEEDS_PREFIX: bool = false;
150
151    fn start_stream<W: Write>(
152        &mut self,
153        _writer: &mut W,
154        _schema: &Schema,
155        compression: Option<CompressionCodec>,
156    ) -> Result<(), AvroError> {
157        if compression.is_some() {
158            return Err(AvroError::InvalidArgument(
159                "Compression not supported for Avro binary streaming".to_string(),
160            ));
161        }
162        Ok(())
163    }
164
165    fn sync_marker(&self) -> Option<&[u8; 16]> {
166        None
167    }
168}
169
170#[inline]
171fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), AvroError> {
172    write_bytes(writer, s.as_bytes())
173}
174
175#[inline]
176fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(), AvroError> {
177    write_long(writer, bytes.len() as i64)?;
178    writer.write_all(bytes)?;
179    Ok(())
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use arrow_schema::{DataType, Field, Schema};
186
187    fn test_schema() -> Schema {
188        Schema::new(vec![Field::new("x", DataType::Int32, false)])
189    }
190
191    #[test]
192    fn avro_binary_format_rejects_compression() {
193        let mut format = AvroBinaryFormat;
194        let schema = test_schema();
195        let err = format
196            .start_stream(
197                &mut Vec::<u8>::new(),
198                &schema,
199                Some(CompressionCodec::Snappy),
200            )
201            .unwrap_err();
202        assert!(
203            err.to_string()
204                .contains("Compression not supported for Avro binary streaming")
205        );
206    }
207
208    #[test]
209    fn avro_soe_format_rejects_compression() {
210        let mut format = AvroSoeFormat::default();
211        let schema = test_schema();
212        let err = format
213            .start_stream(
214                &mut Vec::<u8>::new(),
215                &schema,
216                Some(CompressionCodec::Snappy),
217            )
218            .unwrap_err();
219        assert!(
220            err.to_string()
221                .contains("Compression not supported for Avro SOE streaming")
222        );
223    }
224}