arrow_avro/
compression.rs1use crate::errors::AvroError;
19use arrow_schema::ArrowError;
20#[cfg(any(
21 feature = "deflate",
22 feature = "zstd",
23 feature = "bzip2",
24 feature = "xz"
25))]
26use std::io::{Read, Write};
27
28pub const CODEC_METADATA_KEY: &str = "avro.codec";
30
31#[derive(Debug, Copy, Clone, Eq, PartialEq)]
32pub enum CompressionCodec {
37 Deflate,
39 Snappy,
41 ZStandard,
43 Bzip2,
45 Xz,
47}
48
49impl CompressionCodec {
50 #[allow(unused_variables)]
51 pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, AvroError> {
52 match self {
53 #[cfg(feature = "deflate")]
54 CompressionCodec::Deflate => {
55 let mut decoder = flate2::read::DeflateDecoder::new(block);
56 let mut out = Vec::new();
57 decoder.read_to_end(&mut out)?;
58 Ok(out)
59 }
60 #[cfg(not(feature = "deflate"))]
61 CompressionCodec::Deflate => Err(AvroError::ParseError(
62 "Deflate codec requires deflate feature".to_string(),
63 )),
64 #[cfg(feature = "snappy")]
65 CompressionCodec::Snappy => {
66 let crc = &block[block.len() - 4..];
69 let block = &block[..block.len() - 4];
70
71 let mut decoder = snap::raw::Decoder::new();
72 let decoded = decoder
73 .decompress_vec(block)
74 .map_err(|e| AvroError::External(Box::new(e)))?;
75
76 let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
77 if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
78 return Err(AvroError::ParseError("Snappy CRC mismatch".to_string()));
79 }
80 Ok(decoded)
81 }
82 #[cfg(not(feature = "snappy"))]
83 CompressionCodec::Snappy => Err(AvroError::ParseError(
84 "Snappy codec requires snappy feature".to_string(),
85 )),
86
87 #[cfg(feature = "zstd")]
88 CompressionCodec::ZStandard => {
89 let mut decoder = zstd::Decoder::new(block)?;
90 let mut out = Vec::new();
91 decoder
92 .read_to_end(&mut out)
93 .map_err(|e| AvroError::External(Box::new(e)))?;
94 Ok(out)
95 }
96 #[cfg(not(feature = "zstd"))]
97 CompressionCodec::ZStandard => Err(AvroError::ParseError(
98 "ZStandard codec requires zstd feature".to_string(),
99 )),
100 #[cfg(feature = "bzip2")]
101 CompressionCodec::Bzip2 => {
102 let mut decoder = bzip2::read::BzDecoder::new(block);
103 let mut out = Vec::new();
104 decoder
105 .read_to_end(&mut out)
106 .map_err(|e| AvroError::External(Box::new(e)))?;
107 Ok(out)
108 }
109 #[cfg(not(feature = "bzip2"))]
110 CompressionCodec::Bzip2 => Err(AvroError::ParseError(
111 "Bzip2 codec requires bzip2 feature".to_string(),
112 )),
113 #[cfg(feature = "xz")]
114 CompressionCodec::Xz => {
115 let mut decoder = xz::read::XzDecoder::new(block);
116 let mut out = Vec::new();
117 decoder
118 .read_to_end(&mut out)
119 .map_err(|e| AvroError::External(Box::new(e)))?;
120 Ok(out)
121 }
122 #[cfg(not(feature = "xz"))]
123 CompressionCodec::Xz => Err(AvroError::ParseError(
124 "XZ codec requires xz feature".to_string(),
125 )),
126 }
127 }
128
129 #[allow(unused_variables)]
130 pub(crate) fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArrowError> {
131 match self {
132 #[cfg(feature = "deflate")]
133 CompressionCodec::Deflate => {
134 let mut encoder =
135 flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default());
136 encoder.write_all(data)?;
137 let compressed = encoder.finish()?;
138 Ok(compressed)
139 }
140 #[cfg(not(feature = "deflate"))]
141 CompressionCodec::Deflate => Err(ArrowError::ParseError(
142 "Deflate codec requires deflate feature".to_string(),
143 )),
144
145 #[cfg(feature = "snappy")]
146 CompressionCodec::Snappy => {
147 let mut encoder = snap::raw::Encoder::new();
148 let mut compressed = encoder
150 .compress_vec(data)
151 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
152 let crc_val = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(data);
154 compressed.extend_from_slice(&crc_val.to_be_bytes());
155 Ok(compressed)
156 }
157 #[cfg(not(feature = "snappy"))]
158 CompressionCodec::Snappy => Err(ArrowError::ParseError(
159 "Snappy codec requires snappy feature".to_string(),
160 )),
161
162 #[cfg(feature = "zstd")]
163 CompressionCodec::ZStandard => {
164 let mut encoder = zstd::Encoder::new(Vec::new(), 0)
165 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
166 encoder.write_all(data)?;
167 let compressed = encoder
168 .finish()
169 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
170 Ok(compressed)
171 }
172 #[cfg(not(feature = "zstd"))]
173 CompressionCodec::ZStandard => Err(ArrowError::ParseError(
174 "ZStandard codec requires zstd feature".to_string(),
175 )),
176
177 #[cfg(feature = "bzip2")]
178 CompressionCodec::Bzip2 => {
179 let mut encoder =
180 bzip2::write::BzEncoder::new(Vec::new(), bzip2::Compression::default());
181 encoder.write_all(data)?;
182 let compressed = encoder.finish()?;
183 Ok(compressed)
184 }
185 #[cfg(not(feature = "bzip2"))]
186 CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
187 "Bzip2 codec requires bzip2 feature".to_string(),
188 )),
189 #[cfg(feature = "xz")]
190 CompressionCodec::Xz => {
191 let mut encoder = xz::write::XzEncoder::new(Vec::new(), 6);
192 encoder.write_all(data)?;
193 let compressed = encoder.finish()?;
194 Ok(compressed)
195 }
196 #[cfg(not(feature = "xz"))]
197 CompressionCodec::Xz => Err(ArrowError::ParseError(
198 "XZ codec requires xz feature".to_string(),
199 )),
200 }
201 }
202}