arrow_avro/
compression.rs1use arrow_schema::ArrowError;
19use std::io;
20use std::io::Read;
21
22pub const CODEC_METADATA_KEY: &str = "avro.codec";
24
25#[derive(Debug, Copy, Clone, Eq, PartialEq)]
26pub enum CompressionCodec {
31 Deflate,
33 Snappy,
35 ZStandard,
37 Bzip2,
39 Xz,
41}
42
43impl CompressionCodec {
44 pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, ArrowError> {
45 match self {
46 #[cfg(feature = "deflate")]
47 CompressionCodec::Deflate => {
48 let mut decoder = flate2::read::DeflateDecoder::new(block);
49 let mut out = Vec::new();
50 decoder.read_to_end(&mut out)?;
51 Ok(out)
52 }
53 #[cfg(not(feature = "deflate"))]
54 CompressionCodec::Deflate => Err(ArrowError::ParseError(
55 "Deflate codec requires deflate feature".to_string(),
56 )),
57 #[cfg(feature = "snappy")]
58 CompressionCodec::Snappy => {
59 let crc = &block[block.len() - 4..];
62 let block = &block[..block.len() - 4];
63
64 let mut decoder = snap::raw::Decoder::new();
65 let decoded = decoder
66 .decompress_vec(block)
67 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
68
69 let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
70 if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
71 return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string()));
72 }
73 Ok(decoded)
74 }
75 #[cfg(not(feature = "snappy"))]
76 CompressionCodec::Snappy => Err(ArrowError::ParseError(
77 "Snappy codec requires snappy feature".to_string(),
78 )),
79
80 #[cfg(feature = "zstd")]
81 CompressionCodec::ZStandard => {
82 let mut decoder = zstd::Decoder::new(block)?;
83 let mut out = Vec::new();
84 decoder.read_to_end(&mut out)?;
85 Ok(out)
86 }
87 #[cfg(not(feature = "zstd"))]
88 CompressionCodec::ZStandard => Err(ArrowError::ParseError(
89 "ZStandard codec requires zstd feature".to_string(),
90 )),
91 #[cfg(feature = "bzip2")]
92 CompressionCodec::Bzip2 => {
93 let mut decoder = bzip2::read::BzDecoder::new(block);
94 let mut out = Vec::new();
95 decoder.read_to_end(&mut out)?;
96 Ok(out)
97 }
98 #[cfg(not(feature = "bzip2"))]
99 CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
100 "Bzip2 codec requires bzip2 feature".to_string(),
101 )),
102 #[cfg(feature = "xz")]
103 CompressionCodec::Xz => {
104 let mut decoder = xz::read::XzDecoder::new(block);
105 let mut out = Vec::new();
106 decoder.read_to_end(&mut out)?;
107 Ok(out)
108 }
109 #[cfg(not(feature = "xz"))]
110 CompressionCodec::Xz => Err(ArrowError::ParseError(
111 "XZ codec requires xz feature".to_string(),
112 )),
113 }
114 }
115}