arrow_avro/writer/
format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
19use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
20use crate::writer::encoder::{write_long, EncoderOptions};
21use arrow_schema::{ArrowError, Schema};
22use rand::RngCore;
23use serde_json::{Map as JsonMap, Value as JsonValue};
24use std::fmt::Debug;
25use std::io::Write;
26
27/// Format abstraction implemented by each container‐level writer.
28pub trait AvroFormat: Debug + Default {
29    /// Write any bytes required at the very beginning of the output stream
30    /// (file header, etc.).
31    /// Implementations **must not** write any record data.
32    fn start_stream<W: Write>(
33        &mut self,
34        writer: &mut W,
35        schema: &Schema,
36        compression: Option<CompressionCodec>,
37    ) -> Result<(), ArrowError>;
38
39    /// Return the 16‑byte sync marker (OCF) or `None` (binary stream).
40    fn sync_marker(&self) -> Option<&[u8; 16]>;
41}
42
43/// Avro Object Container File (OCF) format writer.
44#[derive(Debug, Default)]
45pub struct AvroOcfFormat {
46    sync_marker: [u8; 16],
47    /// Optional encoder behavior hints to keep file header schema ordering
48    /// consistent with value encoding (e.g. Impala null-second).
49    encoder_options: EncoderOptions,
50}
51
52impl AvroOcfFormat {
53    /// Optional helper to attach encoder options (i.e., Impala null-second) to the format.
54    #[allow(dead_code)]
55    pub fn with_encoder_options(mut self, opts: EncoderOptions) -> Self {
56        self.encoder_options = opts;
57        self
58    }
59
60    /// Access the options used by this format.
61    #[allow(dead_code)]
62    pub fn encoder_options(&self) -> &EncoderOptions {
63        &self.encoder_options
64    }
65}
66
67impl AvroFormat for AvroOcfFormat {
68    fn start_stream<W: Write>(
69        &mut self,
70        writer: &mut W,
71        schema: &Schema,
72        compression: Option<CompressionCodec>,
73    ) -> Result<(), ArrowError> {
74        let mut rng = rand::rng();
75        rng.fill_bytes(&mut self.sync_marker);
76        let avro_schema = AvroSchema::try_from(schema)?;
77        writer
78            .write_all(b"Obj\x01")
79            .map_err(|e| ArrowError::IoError(format!("write OCF magic: {e}"), e))?;
80        let codec_str = match compression {
81            Some(CompressionCodec::Deflate) => "deflate",
82            Some(CompressionCodec::Snappy) => "snappy",
83            Some(CompressionCodec::ZStandard) => "zstandard",
84            Some(CompressionCodec::Bzip2) => "bzip2",
85            Some(CompressionCodec::Xz) => "xz",
86            None => "null",
87        };
88        write_long(writer, 2)?;
89        write_string(writer, SCHEMA_METADATA_KEY)?;
90        write_bytes(writer, avro_schema.json_string.as_bytes())?;
91        write_string(writer, CODEC_METADATA_KEY)?;
92        write_bytes(writer, codec_str.as_bytes())?;
93        write_long(writer, 0)?;
94        // Sync marker (16 bytes)
95        writer
96            .write_all(&self.sync_marker)
97            .map_err(|e| ArrowError::IoError(format!("write OCF sync marker: {e}"), e))?;
98
99        Ok(())
100    }
101
102    fn sync_marker(&self) -> Option<&[u8; 16]> {
103        Some(&self.sync_marker)
104    }
105}
106
107/// Raw Avro binary streaming format (no header or footer).
108#[derive(Debug, Default)]
109pub struct AvroBinaryFormat;
110
111impl AvroFormat for AvroBinaryFormat {
112    fn start_stream<W: Write>(
113        &mut self,
114        _writer: &mut W,
115        _schema: &Schema,
116        _compression: Option<CompressionCodec>,
117    ) -> Result<(), ArrowError> {
118        Err(ArrowError::NotYetImplemented(
119            "avro binary format not yet implemented".to_string(),
120        ))
121    }
122
123    fn sync_marker(&self) -> Option<&[u8; 16]> {
124        None
125    }
126}
127
128#[inline]
129fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), ArrowError> {
130    write_bytes(writer, s.as_bytes())
131}
132
133#[inline]
134fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
135    write_long(writer, bytes.len() as i64)?;
136    writer
137        .write_all(bytes)
138        .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
139}