arrow_avro/
compression.rs1use arrow_schema::ArrowError;
19use std::io;
20use std::io::{Read, Write};
21
22pub const CODEC_METADATA_KEY: &str = "avro.codec";
24
25#[derive(Debug, Copy, Clone, Eq, PartialEq)]
26pub enum CompressionCodec {
31 Deflate,
33 Snappy,
35 ZStandard,
37 Bzip2,
39 Xz,
41}
42
43impl CompressionCodec {
44 pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, ArrowError> {
45 match self {
46 #[cfg(feature = "deflate")]
47 CompressionCodec::Deflate => {
48 let mut decoder = flate2::read::DeflateDecoder::new(block);
49 let mut out = Vec::new();
50 decoder.read_to_end(&mut out)?;
51 Ok(out)
52 }
53 #[cfg(not(feature = "deflate"))]
54 CompressionCodec::Deflate => Err(ArrowError::ParseError(
55 "Deflate codec requires deflate feature".to_string(),
56 )),
57 #[cfg(feature = "snappy")]
58 CompressionCodec::Snappy => {
59 let crc = &block[block.len() - 4..];
62 let block = &block[..block.len() - 4];
63
64 let mut decoder = snap::raw::Decoder::new();
65 let decoded = decoder
66 .decompress_vec(block)
67 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
68
69 let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
70 if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
71 return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string()));
72 }
73 Ok(decoded)
74 }
75 #[cfg(not(feature = "snappy"))]
76 CompressionCodec::Snappy => Err(ArrowError::ParseError(
77 "Snappy codec requires snappy feature".to_string(),
78 )),
79
80 #[cfg(feature = "zstd")]
81 CompressionCodec::ZStandard => {
82 let mut decoder = zstd::Decoder::new(block)?;
83 let mut out = Vec::new();
84 decoder.read_to_end(&mut out)?;
85 Ok(out)
86 }
87 #[cfg(not(feature = "zstd"))]
88 CompressionCodec::ZStandard => Err(ArrowError::ParseError(
89 "ZStandard codec requires zstd feature".to_string(),
90 )),
91 #[cfg(feature = "bzip2")]
92 CompressionCodec::Bzip2 => {
93 let mut decoder = bzip2::read::BzDecoder::new(block);
94 let mut out = Vec::new();
95 decoder.read_to_end(&mut out)?;
96 Ok(out)
97 }
98 #[cfg(not(feature = "bzip2"))]
99 CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
100 "Bzip2 codec requires bzip2 feature".to_string(),
101 )),
102 #[cfg(feature = "xz")]
103 CompressionCodec::Xz => {
104 let mut decoder = xz::read::XzDecoder::new(block);
105 let mut out = Vec::new();
106 decoder.read_to_end(&mut out)?;
107 Ok(out)
108 }
109 #[cfg(not(feature = "xz"))]
110 CompressionCodec::Xz => Err(ArrowError::ParseError(
111 "XZ codec requires xz feature".to_string(),
112 )),
113 }
114 }
115
116 pub(crate) fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArrowError> {
117 match self {
118 #[cfg(feature = "deflate")]
119 CompressionCodec::Deflate => {
120 let mut encoder =
121 flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default());
122 encoder.write_all(data)?;
123 let compressed = encoder.finish()?;
124 Ok(compressed)
125 }
126 #[cfg(not(feature = "deflate"))]
127 CompressionCodec::Deflate => Err(ArrowError::ParseError(
128 "Deflate codec requires deflate feature".to_string(),
129 )),
130
131 #[cfg(feature = "snappy")]
132 CompressionCodec::Snappy => {
133 let mut encoder = snap::raw::Encoder::new();
134 let mut compressed = encoder
136 .compress_vec(data)
137 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
138 let crc_val = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(data);
140 compressed.extend_from_slice(&crc_val.to_be_bytes());
141 Ok(compressed)
142 }
143 #[cfg(not(feature = "snappy"))]
144 CompressionCodec::Snappy => Err(ArrowError::ParseError(
145 "Snappy codec requires snappy feature".to_string(),
146 )),
147
148 #[cfg(feature = "zstd")]
149 CompressionCodec::ZStandard => {
150 let mut encoder = zstd::Encoder::new(Vec::new(), 0)
151 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
152 encoder.write_all(data)?;
153 let compressed = encoder
154 .finish()
155 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
156 Ok(compressed)
157 }
158 #[cfg(not(feature = "zstd"))]
159 CompressionCodec::ZStandard => Err(ArrowError::ParseError(
160 "ZStandard codec requires zstd feature".to_string(),
161 )),
162
163 #[cfg(feature = "bzip2")]
164 CompressionCodec::Bzip2 => {
165 let mut encoder =
166 bzip2::write::BzEncoder::new(Vec::new(), bzip2::Compression::default());
167 encoder.write_all(data)?;
168 let compressed = encoder.finish()?;
169 Ok(compressed)
170 }
171 #[cfg(not(feature = "bzip2"))]
172 CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
173 "Bzip2 codec requires bzip2 feature".to_string(),
174 )),
175 #[cfg(feature = "xz")]
176 CompressionCodec::Xz => {
177 let mut encoder = xz::write::XzEncoder::new(Vec::new(), 6);
178 encoder.write_all(data)?;
179 let compressed = encoder.finish()?;
180 Ok(compressed)
181 }
182 #[cfg(not(feature = "xz"))]
183 CompressionCodec::Xz => Err(ArrowError::ParseError(
184 "XZ codec requires xz feature".to_string(),
185 )),
186 }
187 }
188}