arrow_avro/writer/
format.rs1use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
21use crate::errors::AvroError;
22use crate::schema::{AvroSchema, AvroSchemaOptions, SCHEMA_METADATA_KEY};
23use crate::writer::encoder::write_long;
24use arrow_schema::Schema;
25use rand::RngCore;
26use std::fmt::Debug;
27use std::io::Write;
28
29pub trait AvroFormat: Debug + Default {
31 const NEEDS_PREFIX: bool;
35
36 fn start_stream<W: Write>(
40 &mut self,
41 writer: &mut W,
42 schema: &Schema,
43 compression: Option<CompressionCodec>,
44 ) -> Result<(), AvroError>;
45
46 fn sync_marker(&self) -> Option<&[u8; 16]>;
48}
49
50#[derive(Debug, Default)]
52pub struct AvroOcfFormat {
53 sync_marker: [u8; 16],
54}
55
56impl AvroFormat for AvroOcfFormat {
57 const NEEDS_PREFIX: bool = false;
58 fn start_stream<W: Write>(
59 &mut self,
60 writer: &mut W,
61 schema: &Schema,
62 compression: Option<CompressionCodec>,
63 ) -> Result<(), AvroError> {
64 let mut rng = rand::rng();
65 rng.fill_bytes(&mut self.sync_marker);
66 let avro_schema = AvroSchema::from_arrow_with_options(
70 schema,
71 Some(AvroSchemaOptions {
72 null_order: None,
73 strip_metadata: true,
74 }),
75 )
76 .map_err(|e| AvroError::SchemaError(format!("{:?}", e)))?;
77 writer.write_all(b"Obj\x01")?;
79 let codec_str = match compression {
81 Some(CompressionCodec::Deflate) => "deflate",
82 Some(CompressionCodec::Snappy) => "snappy",
83 Some(CompressionCodec::ZStandard) => "zstandard",
84 Some(CompressionCodec::Bzip2) => "bzip2",
85 Some(CompressionCodec::Xz) => "xz",
86 None => "null",
87 };
88 write_long(writer, 2)?;
90 write_string(writer, SCHEMA_METADATA_KEY)?;
91 write_bytes(writer, avro_schema.json_string.as_bytes())?;
92 write_string(writer, CODEC_METADATA_KEY)?;
93 write_bytes(writer, codec_str.as_bytes())?;
94 write_long(writer, 0)?;
95 writer.write_all(&self.sync_marker)?;
97 Ok(())
98 }
99
100 fn sync_marker(&self) -> Option<&[u8; 16]> {
101 Some(&self.sync_marker)
102 }
103}
104
105#[derive(Debug, Default)]
113pub struct AvroSoeFormat {}
114
115impl AvroFormat for AvroSoeFormat {
116 const NEEDS_PREFIX: bool = true;
117 fn start_stream<W: Write>(
118 &mut self,
119 _writer: &mut W,
120 _schema: &Schema,
121 compression: Option<CompressionCodec>,
122 ) -> Result<(), AvroError> {
123 if compression.is_some() {
124 return Err(AvroError::InvalidArgument(
125 "Compression not supported for Avro SOE streaming".to_string(),
126 ));
127 }
128 Ok(())
129 }
130
131 fn sync_marker(&self) -> Option<&[u8; 16]> {
132 None
133 }
134}
135
136#[derive(Debug, Default)]
146pub struct AvroBinaryFormat;
147
148impl AvroFormat for AvroBinaryFormat {
149 const NEEDS_PREFIX: bool = false;
150
151 fn start_stream<W: Write>(
152 &mut self,
153 _writer: &mut W,
154 _schema: &Schema,
155 compression: Option<CompressionCodec>,
156 ) -> Result<(), AvroError> {
157 if compression.is_some() {
158 return Err(AvroError::InvalidArgument(
159 "Compression not supported for Avro binary streaming".to_string(),
160 ));
161 }
162 Ok(())
163 }
164
165 fn sync_marker(&self) -> Option<&[u8; 16]> {
166 None
167 }
168}
169
170#[inline]
171fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), AvroError> {
172 write_bytes(writer, s.as_bytes())
173}
174
175#[inline]
176fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(), AvroError> {
177 write_long(writer, bytes.len() as i64)?;
178 writer.write_all(bytes)?;
179 Ok(())
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use arrow_schema::{DataType, Field, Schema};
186
187 fn test_schema() -> Schema {
188 Schema::new(vec![Field::new("x", DataType::Int32, false)])
189 }
190
191 #[test]
192 fn avro_binary_format_rejects_compression() {
193 let mut format = AvroBinaryFormat;
194 let schema = test_schema();
195 let err = format
196 .start_stream(
197 &mut Vec::<u8>::new(),
198 &schema,
199 Some(CompressionCodec::Snappy),
200 )
201 .unwrap_err();
202 assert!(
203 err.to_string()
204 .contains("Compression not supported for Avro binary streaming")
205 );
206 }
207
208 #[test]
209 fn avro_soe_format_rejects_compression() {
210 let mut format = AvroSoeFormat::default();
211 let schema = test_schema();
212 let err = format
213 .start_stream(
214 &mut Vec::<u8>::new(),
215 &schema,
216 Some(CompressionCodec::Snappy),
217 )
218 .unwrap_err();
219 assert!(
220 err.to_string()
221 .contains("Compression not supported for Avro SOE streaming")
222 );
223 }
224}