arrow_avro/writer/
format.rs1use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
19use crate::schema::{
20 AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, CONFLUENT_MAGIC,
21 SCHEMA_METADATA_KEY, SINGLE_OBJECT_MAGIC,
22};
23use crate::writer::encoder::write_long;
24use arrow_schema::{ArrowError, Schema};
25use rand::RngCore;
26use std::fmt::Debug;
27use std::io::Write;
28
29pub trait AvroFormat: Debug + Default {
31 const NEEDS_PREFIX: bool;
35
36 fn start_stream<W: Write>(
40 &mut self,
41 writer: &mut W,
42 schema: &Schema,
43 compression: Option<CompressionCodec>,
44 ) -> Result<(), ArrowError>;
45
46 fn sync_marker(&self) -> Option<&[u8; 16]>;
48}
49
50#[derive(Debug, Default)]
52pub struct AvroOcfFormat {
53 sync_marker: [u8; 16],
54}
55
56impl AvroFormat for AvroOcfFormat {
57 const NEEDS_PREFIX: bool = false;
58 fn start_stream<W: Write>(
59 &mut self,
60 writer: &mut W,
61 schema: &Schema,
62 compression: Option<CompressionCodec>,
63 ) -> Result<(), ArrowError> {
64 let mut rng = rand::rng();
65 rng.fill_bytes(&mut self.sync_marker);
66 let avro_schema = AvroSchema::try_from(schema)?;
70 writer
72 .write_all(b"Obj\x01")
73 .map_err(|e| ArrowError::IoError(format!("write OCF magic: {e}"), e))?;
74 let codec_str = match compression {
76 Some(CompressionCodec::Deflate) => "deflate",
77 Some(CompressionCodec::Snappy) => "snappy",
78 Some(CompressionCodec::ZStandard) => "zstandard",
79 Some(CompressionCodec::Bzip2) => "bzip2",
80 Some(CompressionCodec::Xz) => "xz",
81 None => "null",
82 };
83 write_long(writer, 2)?;
85 write_string(writer, SCHEMA_METADATA_KEY)?;
86 write_bytes(writer, avro_schema.json_string.as_bytes())?;
87 write_string(writer, CODEC_METADATA_KEY)?;
88 write_bytes(writer, codec_str.as_bytes())?;
89 write_long(writer, 0)?;
90 writer
92 .write_all(&self.sync_marker)
93 .map_err(|e| ArrowError::IoError(format!("write OCF sync marker: {e}"), e))?;
94 Ok(())
95 }
96
97 fn sync_marker(&self) -> Option<&[u8; 16]> {
98 Some(&self.sync_marker)
99 }
100}
101
102#[derive(Debug, Default)]
110pub struct AvroBinaryFormat {}
111
112impl AvroFormat for AvroBinaryFormat {
113 const NEEDS_PREFIX: bool = true;
114 fn start_stream<W: Write>(
115 &mut self,
116 _writer: &mut W,
117 _schema: &Schema,
118 compression: Option<CompressionCodec>,
119 ) -> Result<(), ArrowError> {
120 if compression.is_some() {
121 return Err(ArrowError::InvalidArgumentError(
122 "Compression not supported for Avro binary streaming".to_string(),
123 ));
124 }
125
126 Ok(())
127 }
128
129 fn sync_marker(&self) -> Option<&[u8; 16]> {
130 None
131 }
132}
133
134#[inline]
135fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), ArrowError> {
136 write_bytes(writer, s.as_bytes())
137}
138
139#[inline]
140fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
141 write_long(writer, bytes.len() as i64)?;
142 writer
143 .write_all(bytes)
144 .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
145}