arrow_avro/writer/
format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Writer Formats for Arrow.
19
20use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
21use crate::schema::{AvroSchema, AvroSchemaOptions, SCHEMA_METADATA_KEY};
22use crate::writer::encoder::write_long;
23use arrow_schema::{ArrowError, Schema};
24use rand::RngCore;
25use std::fmt::Debug;
26use std::io::Write;
27
28/// Format abstraction implemented by each container‐level writer.
29pub trait AvroFormat: Debug + Default {
30    /// If `true`, the writer for this format will query `single_object_prefix()`
31    /// and write the prefix before each record. If `false`, the writer can
32    /// skip this step. This is a performance hint for the writer.
33    const NEEDS_PREFIX: bool;
34
35    /// Write any bytes required at the very beginning of the output stream
36    /// (file header, etc.).
37    /// Implementations **must not** write any record data.
38    fn start_stream<W: Write>(
39        &mut self,
40        writer: &mut W,
41        schema: &Schema,
42        compression: Option<CompressionCodec>,
43    ) -> Result<(), ArrowError>;
44
45    /// Return the 16‑byte sync marker (OCF) or `None` (binary stream).
46    fn sync_marker(&self) -> Option<&[u8; 16]>;
47}
48
49/// Avro Object Container File (OCF) format writer.
50#[derive(Debug, Default)]
51pub struct AvroOcfFormat {
52    sync_marker: [u8; 16],
53}
54
55impl AvroFormat for AvroOcfFormat {
56    const NEEDS_PREFIX: bool = false;
57    fn start_stream<W: Write>(
58        &mut self,
59        writer: &mut W,
60        schema: &Schema,
61        compression: Option<CompressionCodec>,
62    ) -> Result<(), ArrowError> {
63        let mut rng = rand::rng();
64        rng.fill_bytes(&mut self.sync_marker);
65        // Choose the Avro schema JSON that the file will advertise.
66        // If `schema.metadata[SCHEMA_METADATA_KEY]` exists, AvroSchema::try_from
67        // uses it verbatim; otherwise it is generated from the Arrow schema.
68        let avro_schema = AvroSchema::from_arrow_with_options(
69            schema,
70            Some(AvroSchemaOptions {
71                null_order: None,
72                strip_metadata: true,
73            }),
74        )?;
75        // Magic
76        writer
77            .write_all(b"Obj\x01")
78            .map_err(|e| ArrowError::IoError(format!("write OCF magic: {e}"), e))?;
79        // File metadata map: { "avro.schema": <json>, "avro.codec": <codec> }
80        let codec_str = match compression {
81            Some(CompressionCodec::Deflate) => "deflate",
82            Some(CompressionCodec::Snappy) => "snappy",
83            Some(CompressionCodec::ZStandard) => "zstandard",
84            Some(CompressionCodec::Bzip2) => "bzip2",
85            Some(CompressionCodec::Xz) => "xz",
86            None => "null",
87        };
88        // Map block: count=2, then key/value pairs, then terminating count=0
89        write_long(writer, 2)?;
90        write_string(writer, SCHEMA_METADATA_KEY)?;
91        write_bytes(writer, avro_schema.json_string.as_bytes())?;
92        write_string(writer, CODEC_METADATA_KEY)?;
93        write_bytes(writer, codec_str.as_bytes())?;
94        write_long(writer, 0)?;
95        // Sync marker (16 bytes)
96        writer
97            .write_all(&self.sync_marker)
98            .map_err(|e| ArrowError::IoError(format!("write OCF sync marker: {e}"), e))?;
99        Ok(())
100    }
101
102    fn sync_marker(&self) -> Option<&[u8; 16]> {
103        Some(&self.sync_marker)
104    }
105}
106
107/// Raw Avro binary streaming format using **Single-Object Encoding** per record.
108///
109/// Each record written by the stream writer is framed with a prefix determined
110/// by the schema fingerprinting algorithm.
111///
112/// See: <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
113/// See: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
114#[derive(Debug, Default)]
115pub struct AvroSoeFormat {}
116
117impl AvroFormat for AvroSoeFormat {
118    const NEEDS_PREFIX: bool = true;
119    fn start_stream<W: Write>(
120        &mut self,
121        _writer: &mut W,
122        _schema: &Schema,
123        compression: Option<CompressionCodec>,
124    ) -> Result<(), ArrowError> {
125        if compression.is_some() {
126            return Err(ArrowError::InvalidArgumentError(
127                "Compression not supported for Avro SOE streaming".to_string(),
128            ));
129        }
130        Ok(())
131    }
132
133    fn sync_marker(&self) -> Option<&[u8; 16]> {
134        None
135    }
136}
137
138#[inline]
139fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), ArrowError> {
140    write_bytes(writer, s.as_bytes())
141}
142
143#[inline]
144fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
145    write_long(writer, bytes.len() as i64)?;
146    writer
147        .write_all(bytes)
148        .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
149}