arrow_avro/
compression.rs1use arrow_schema::ArrowError;
19use std::io;
20use std::io::Read;
21
22pub const CODEC_METADATA_KEY: &str = "avro.codec";
24
25#[derive(Debug, Copy, Clone, Eq, PartialEq)]
26pub enum CompressionCodec {
27 Deflate,
28 Snappy,
29 ZStandard,
30}
31
32impl CompressionCodec {
33 pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, ArrowError> {
34 match self {
35 #[cfg(feature = "deflate")]
36 CompressionCodec::Deflate => {
37 let mut decoder = flate2::read::DeflateDecoder::new(block);
38 let mut out = Vec::new();
39 decoder.read_to_end(&mut out)?;
40 Ok(out)
41 }
42 #[cfg(not(feature = "deflate"))]
43 CompressionCodec::Deflate => Err(ArrowError::ParseError(
44 "Deflate codec requires deflate feature".to_string(),
45 )),
46 #[cfg(feature = "snappy")]
47 CompressionCodec::Snappy => {
48 let crc = &block[block.len() - 4..];
51 let block = &block[..block.len() - 4];
52
53 let mut decoder = snap::raw::Decoder::new();
54 let decoded = decoder
55 .decompress_vec(block)
56 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
57
58 let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
59 if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
60 return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string()));
61 }
62 Ok(decoded)
63 }
64 #[cfg(not(feature = "snappy"))]
65 CompressionCodec::Snappy => Err(ArrowError::ParseError(
66 "Snappy codec requires snappy feature".to_string(),
67 )),
68
69 #[cfg(feature = "zstd")]
70 CompressionCodec::ZStandard => {
71 let mut decoder = zstd::Decoder::new(block)?;
72 let mut out = Vec::new();
73 decoder.read_to_end(&mut out)?;
74 Ok(out)
75 }
76 #[cfg(not(feature = "zstd"))]
77 CompressionCodec::ZStandard => Err(ArrowError::ParseError(
78 "ZStandard codec requires zstd feature".to_string(),
79 )),
80 }
81 }
82}