arrow_avro/writer/
format.rs1use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
19use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
20use crate::writer::encoder::{write_long, EncoderOptions};
21use arrow_schema::{ArrowError, Schema};
22use rand::RngCore;
23use serde_json::{Map as JsonMap, Value as JsonValue};
24use std::fmt::Debug;
25use std::io::Write;
26
27pub trait AvroFormat: Debug + Default {
29 fn start_stream<W: Write>(
33 &mut self,
34 writer: &mut W,
35 schema: &Schema,
36 compression: Option<CompressionCodec>,
37 ) -> Result<(), ArrowError>;
38
39 fn sync_marker(&self) -> Option<&[u8; 16]>;
41}
42
43#[derive(Debug, Default)]
45pub struct AvroOcfFormat {
46 sync_marker: [u8; 16],
47 encoder_options: EncoderOptions,
50}
51
52impl AvroOcfFormat {
53 #[allow(dead_code)]
55 pub fn with_encoder_options(mut self, opts: EncoderOptions) -> Self {
56 self.encoder_options = opts;
57 self
58 }
59
60 #[allow(dead_code)]
62 pub fn encoder_options(&self) -> &EncoderOptions {
63 &self.encoder_options
64 }
65}
66
67impl AvroFormat for AvroOcfFormat {
68 fn start_stream<W: Write>(
69 &mut self,
70 writer: &mut W,
71 schema: &Schema,
72 compression: Option<CompressionCodec>,
73 ) -> Result<(), ArrowError> {
74 let mut rng = rand::rng();
75 rng.fill_bytes(&mut self.sync_marker);
76 let avro_schema = AvroSchema::try_from(schema)?;
77 writer
78 .write_all(b"Obj\x01")
79 .map_err(|e| ArrowError::IoError(format!("write OCF magic: {e}"), e))?;
80 let codec_str = match compression {
81 Some(CompressionCodec::Deflate) => "deflate",
82 Some(CompressionCodec::Snappy) => "snappy",
83 Some(CompressionCodec::ZStandard) => "zstandard",
84 Some(CompressionCodec::Bzip2) => "bzip2",
85 Some(CompressionCodec::Xz) => "xz",
86 None => "null",
87 };
88 write_long(writer, 2)?;
89 write_string(writer, SCHEMA_METADATA_KEY)?;
90 write_bytes(writer, avro_schema.json_string.as_bytes())?;
91 write_string(writer, CODEC_METADATA_KEY)?;
92 write_bytes(writer, codec_str.as_bytes())?;
93 write_long(writer, 0)?;
94 writer
96 .write_all(&self.sync_marker)
97 .map_err(|e| ArrowError::IoError(format!("write OCF sync marker: {e}"), e))?;
98
99 Ok(())
100 }
101
102 fn sync_marker(&self) -> Option<&[u8; 16]> {
103 Some(&self.sync_marker)
104 }
105}
106
107#[derive(Debug, Default)]
109pub struct AvroBinaryFormat;
110
111impl AvroFormat for AvroBinaryFormat {
112 fn start_stream<W: Write>(
113 &mut self,
114 _writer: &mut W,
115 _schema: &Schema,
116 _compression: Option<CompressionCodec>,
117 ) -> Result<(), ArrowError> {
118 Err(ArrowError::NotYetImplemented(
119 "avro binary format not yet implemented".to_string(),
120 ))
121 }
122
123 fn sync_marker(&self) -> Option<&[u8; 16]> {
124 None
125 }
126}
127
128#[inline]
129fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), ArrowError> {
130 write_bytes(writer, s.as_bytes())
131}
132
133#[inline]
134fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
135 write_long(writer, bytes.len() as i64)?;
136 writer
137 .write_all(bytes)
138 .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
139}