arrow_avro/writer/
format.rs1use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
21use crate::schema::{AvroSchema, AvroSchemaOptions, SCHEMA_METADATA_KEY};
22use crate::writer::encoder::write_long;
23use arrow_schema::{ArrowError, Schema};
24use rand::RngCore;
25use std::fmt::Debug;
26use std::io::Write;
27
28pub trait AvroFormat: Debug + Default {
30 const NEEDS_PREFIX: bool;
34
35 fn start_stream<W: Write>(
39 &mut self,
40 writer: &mut W,
41 schema: &Schema,
42 compression: Option<CompressionCodec>,
43 ) -> Result<(), ArrowError>;
44
45 fn sync_marker(&self) -> Option<&[u8; 16]>;
47}
48
49#[derive(Debug, Default)]
51pub struct AvroOcfFormat {
52 sync_marker: [u8; 16],
53}
54
55impl AvroFormat for AvroOcfFormat {
56 const NEEDS_PREFIX: bool = false;
57 fn start_stream<W: Write>(
58 &mut self,
59 writer: &mut W,
60 schema: &Schema,
61 compression: Option<CompressionCodec>,
62 ) -> Result<(), ArrowError> {
63 let mut rng = rand::rng();
64 rng.fill_bytes(&mut self.sync_marker);
65 let avro_schema = AvroSchema::from_arrow_with_options(
69 schema,
70 Some(AvroSchemaOptions {
71 null_order: None,
72 strip_metadata: true,
73 }),
74 )?;
75 writer
77 .write_all(b"Obj\x01")
78 .map_err(|e| ArrowError::IoError(format!("write OCF magic: {e}"), e))?;
79 let codec_str = match compression {
81 Some(CompressionCodec::Deflate) => "deflate",
82 Some(CompressionCodec::Snappy) => "snappy",
83 Some(CompressionCodec::ZStandard) => "zstandard",
84 Some(CompressionCodec::Bzip2) => "bzip2",
85 Some(CompressionCodec::Xz) => "xz",
86 None => "null",
87 };
88 write_long(writer, 2)?;
90 write_string(writer, SCHEMA_METADATA_KEY)?;
91 write_bytes(writer, avro_schema.json_string.as_bytes())?;
92 write_string(writer, CODEC_METADATA_KEY)?;
93 write_bytes(writer, codec_str.as_bytes())?;
94 write_long(writer, 0)?;
95 writer
97 .write_all(&self.sync_marker)
98 .map_err(|e| ArrowError::IoError(format!("write OCF sync marker: {e}"), e))?;
99 Ok(())
100 }
101
102 fn sync_marker(&self) -> Option<&[u8; 16]> {
103 Some(&self.sync_marker)
104 }
105}
106
107#[derive(Debug, Default)]
115pub struct AvroSoeFormat {}
116
117impl AvroFormat for AvroSoeFormat {
118 const NEEDS_PREFIX: bool = true;
119 fn start_stream<W: Write>(
120 &mut self,
121 _writer: &mut W,
122 _schema: &Schema,
123 compression: Option<CompressionCodec>,
124 ) -> Result<(), ArrowError> {
125 if compression.is_some() {
126 return Err(ArrowError::InvalidArgumentError(
127 "Compression not supported for Avro SOE streaming".to_string(),
128 ));
129 }
130 Ok(())
131 }
132
133 fn sync_marker(&self) -> Option<&[u8; 16]> {
134 None
135 }
136}
137
138#[inline]
139fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), ArrowError> {
140 write_bytes(writer, s.as_bytes())
141}
142
143#[inline]
144fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
145 write_long(writer, bytes.len() as i64)?;
146 writer
147 .write_all(bytes)
148 .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
149}