arrow_avro/writer/
format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
19use crate::schema::{
20    AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, CONFLUENT_MAGIC,
21    SCHEMA_METADATA_KEY, SINGLE_OBJECT_MAGIC,
22};
23use crate::writer::encoder::write_long;
24use arrow_schema::{ArrowError, Schema};
25use rand::RngCore;
26use std::fmt::Debug;
27use std::io::Write;
28
29/// Format abstraction implemented by each container‐level writer.
30pub trait AvroFormat: Debug + Default {
31    /// If `true`, the writer for this format will query `single_object_prefix()`
32    /// and write the prefix before each record. If `false`, the writer can
33    /// skip this step. This is a performance hint for the writer.
34    const NEEDS_PREFIX: bool;
35
36    /// Write any bytes required at the very beginning of the output stream
37    /// (file header, etc.).
38    /// Implementations **must not** write any record data.
39    fn start_stream<W: Write>(
40        &mut self,
41        writer: &mut W,
42        schema: &Schema,
43        compression: Option<CompressionCodec>,
44    ) -> Result<(), ArrowError>;
45
46    /// Return the 16‑byte sync marker (OCF) or `None` (binary stream).
47    fn sync_marker(&self) -> Option<&[u8; 16]>;
48}
49
50/// Avro Object Container File (OCF) format writer.
51#[derive(Debug, Default)]
52pub struct AvroOcfFormat {
53    sync_marker: [u8; 16],
54}
55
56impl AvroFormat for AvroOcfFormat {
57    const NEEDS_PREFIX: bool = false;
58    fn start_stream<W: Write>(
59        &mut self,
60        writer: &mut W,
61        schema: &Schema,
62        compression: Option<CompressionCodec>,
63    ) -> Result<(), ArrowError> {
64        let mut rng = rand::rng();
65        rng.fill_bytes(&mut self.sync_marker);
66        // Choose the Avro schema JSON that the file will advertise.
67        // If `schema.metadata[SCHEMA_METADATA_KEY]` exists, AvroSchema::try_from
68        // uses it verbatim; otherwise it is generated from the Arrow schema.
69        let avro_schema = AvroSchema::try_from(schema)?;
70        // Magic
71        writer
72            .write_all(b"Obj\x01")
73            .map_err(|e| ArrowError::IoError(format!("write OCF magic: {e}"), e))?;
74        // File metadata map: { "avro.schema": <json>, "avro.codec": <codec> }
75        let codec_str = match compression {
76            Some(CompressionCodec::Deflate) => "deflate",
77            Some(CompressionCodec::Snappy) => "snappy",
78            Some(CompressionCodec::ZStandard) => "zstandard",
79            Some(CompressionCodec::Bzip2) => "bzip2",
80            Some(CompressionCodec::Xz) => "xz",
81            None => "null",
82        };
83        // Map block: count=2, then key/value pairs, then terminating count=0
84        write_long(writer, 2)?;
85        write_string(writer, SCHEMA_METADATA_KEY)?;
86        write_bytes(writer, avro_schema.json_string.as_bytes())?;
87        write_string(writer, CODEC_METADATA_KEY)?;
88        write_bytes(writer, codec_str.as_bytes())?;
89        write_long(writer, 0)?;
90        // Sync marker (16 bytes)
91        writer
92            .write_all(&self.sync_marker)
93            .map_err(|e| ArrowError::IoError(format!("write OCF sync marker: {e}"), e))?;
94        Ok(())
95    }
96
97    fn sync_marker(&self) -> Option<&[u8; 16]> {
98        Some(&self.sync_marker)
99    }
100}
101
102/// Raw Avro binary streaming format using **Single-Object Encoding** per record.
103///
104/// Each record written by the stream writer is framed with a prefix determined
105/// by the schema fingerprinting algorithm.
106///
107/// See: <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
108/// See: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
109#[derive(Debug, Default)]
110pub struct AvroBinaryFormat {}
111
112impl AvroFormat for AvroBinaryFormat {
113    const NEEDS_PREFIX: bool = true;
114    fn start_stream<W: Write>(
115        &mut self,
116        _writer: &mut W,
117        _schema: &Schema,
118        compression: Option<CompressionCodec>,
119    ) -> Result<(), ArrowError> {
120        if compression.is_some() {
121            return Err(ArrowError::InvalidArgumentError(
122                "Compression not supported for Avro binary streaming".to_string(),
123            ));
124        }
125
126        Ok(())
127    }
128
129    fn sync_marker(&self) -> Option<&[u8; 16]> {
130        None
131    }
132}
133
134#[inline]
135fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), ArrowError> {
136    write_bytes(writer, s.as_bytes())
137}
138
139#[inline]
140fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
141    write_long(writer, bytes.len() as i64)?;
142    writer
143        .write_all(bytes)
144        .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
145}