arrow_avro/
compression.rs1use arrow_schema::ArrowError;
19#[cfg(any(
20 feature = "deflate",
21 feature = "zstd",
22 feature = "bzip2",
23 feature = "xz"
24))]
25use std::io::{Read, Write};
26
27pub const CODEC_METADATA_KEY: &str = "avro.codec";
29
30#[derive(Debug, Copy, Clone, Eq, PartialEq)]
31pub enum CompressionCodec {
36 Deflate,
38 Snappy,
40 ZStandard,
42 Bzip2,
44 Xz,
46}
47
48impl CompressionCodec {
49 #[allow(unused_variables)]
50 pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, ArrowError> {
51 match self {
52 #[cfg(feature = "deflate")]
53 CompressionCodec::Deflate => {
54 let mut decoder = flate2::read::DeflateDecoder::new(block);
55 let mut out = Vec::new();
56 decoder.read_to_end(&mut out)?;
57 Ok(out)
58 }
59 #[cfg(not(feature = "deflate"))]
60 CompressionCodec::Deflate => Err(ArrowError::ParseError(
61 "Deflate codec requires deflate feature".to_string(),
62 )),
63 #[cfg(feature = "snappy")]
64 CompressionCodec::Snappy => {
65 let crc = &block[block.len() - 4..];
68 let block = &block[..block.len() - 4];
69
70 let mut decoder = snap::raw::Decoder::new();
71 let decoded = decoder
72 .decompress_vec(block)
73 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
74
75 let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
76 if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
77 return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string()));
78 }
79 Ok(decoded)
80 }
81 #[cfg(not(feature = "snappy"))]
82 CompressionCodec::Snappy => Err(ArrowError::ParseError(
83 "Snappy codec requires snappy feature".to_string(),
84 )),
85
86 #[cfg(feature = "zstd")]
87 CompressionCodec::ZStandard => {
88 let mut decoder = zstd::Decoder::new(block)?;
89 let mut out = Vec::new();
90 decoder.read_to_end(&mut out)?;
91 Ok(out)
92 }
93 #[cfg(not(feature = "zstd"))]
94 CompressionCodec::ZStandard => Err(ArrowError::ParseError(
95 "ZStandard codec requires zstd feature".to_string(),
96 )),
97 #[cfg(feature = "bzip2")]
98 CompressionCodec::Bzip2 => {
99 let mut decoder = bzip2::read::BzDecoder::new(block);
100 let mut out = Vec::new();
101 decoder.read_to_end(&mut out)?;
102 Ok(out)
103 }
104 #[cfg(not(feature = "bzip2"))]
105 CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
106 "Bzip2 codec requires bzip2 feature".to_string(),
107 )),
108 #[cfg(feature = "xz")]
109 CompressionCodec::Xz => {
110 let mut decoder = xz::read::XzDecoder::new(block);
111 let mut out = Vec::new();
112 decoder.read_to_end(&mut out)?;
113 Ok(out)
114 }
115 #[cfg(not(feature = "xz"))]
116 CompressionCodec::Xz => Err(ArrowError::ParseError(
117 "XZ codec requires xz feature".to_string(),
118 )),
119 }
120 }
121
122 #[allow(unused_variables)]
123 pub(crate) fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArrowError> {
124 match self {
125 #[cfg(feature = "deflate")]
126 CompressionCodec::Deflate => {
127 let mut encoder =
128 flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default());
129 encoder.write_all(data)?;
130 let compressed = encoder.finish()?;
131 Ok(compressed)
132 }
133 #[cfg(not(feature = "deflate"))]
134 CompressionCodec::Deflate => Err(ArrowError::ParseError(
135 "Deflate codec requires deflate feature".to_string(),
136 )),
137
138 #[cfg(feature = "snappy")]
139 CompressionCodec::Snappy => {
140 let mut encoder = snap::raw::Encoder::new();
141 let mut compressed = encoder
143 .compress_vec(data)
144 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
145 let crc_val = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(data);
147 compressed.extend_from_slice(&crc_val.to_be_bytes());
148 Ok(compressed)
149 }
150 #[cfg(not(feature = "snappy"))]
151 CompressionCodec::Snappy => Err(ArrowError::ParseError(
152 "Snappy codec requires snappy feature".to_string(),
153 )),
154
155 #[cfg(feature = "zstd")]
156 CompressionCodec::ZStandard => {
157 let mut encoder = zstd::Encoder::new(Vec::new(), 0)
158 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
159 encoder.write_all(data)?;
160 let compressed = encoder
161 .finish()
162 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
163 Ok(compressed)
164 }
165 #[cfg(not(feature = "zstd"))]
166 CompressionCodec::ZStandard => Err(ArrowError::ParseError(
167 "ZStandard codec requires zstd feature".to_string(),
168 )),
169
170 #[cfg(feature = "bzip2")]
171 CompressionCodec::Bzip2 => {
172 let mut encoder =
173 bzip2::write::BzEncoder::new(Vec::new(), bzip2::Compression::default());
174 encoder.write_all(data)?;
175 let compressed = encoder.finish()?;
176 Ok(compressed)
177 }
178 #[cfg(not(feature = "bzip2"))]
179 CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
180 "Bzip2 codec requires bzip2 feature".to_string(),
181 )),
182 #[cfg(feature = "xz")]
183 CompressionCodec::Xz => {
184 let mut encoder = xz::write::XzEncoder::new(Vec::new(), 6);
185 encoder.write_all(data)?;
186 let compressed = encoder.finish()?;
187 Ok(compressed)
188 }
189 #[cfg(not(feature = "xz"))]
190 CompressionCodec::Xz => Err(ArrowError::ParseError(
191 "XZ codec requires xz feature".to_string(),
192 )),
193 }
194 }
195}