arrow_avro/
compression.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_schema::ArrowError;
19#[cfg(any(
20    feature = "deflate",
21    feature = "zstd",
22    feature = "bzip2",
23    feature = "xz"
24))]
25use std::io::{Read, Write};
26
27/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
28pub const CODEC_METADATA_KEY: &str = "avro.codec";
29
30#[derive(Debug, Copy, Clone, Eq, PartialEq)]
31/// Supported compression codecs for Avro data
32///
33/// Avro supports multiple compression formats for data blocks.
34/// This enum represents the compression codecs available in this implementation.
35pub enum CompressionCodec {
36    /// Deflate compression (RFC 1951)
37    Deflate,
38    /// Snappy compression
39    Snappy,
40    /// ZStandard compression
41    ZStandard,
42    /// Bzip2 compression
43    Bzip2,
44    /// Xz compression
45    Xz,
46}
47
48impl CompressionCodec {
49    #[allow(unused_variables)]
50    pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, ArrowError> {
51        match self {
52            #[cfg(feature = "deflate")]
53            CompressionCodec::Deflate => {
54                let mut decoder = flate2::read::DeflateDecoder::new(block);
55                let mut out = Vec::new();
56                decoder.read_to_end(&mut out)?;
57                Ok(out)
58            }
59            #[cfg(not(feature = "deflate"))]
60            CompressionCodec::Deflate => Err(ArrowError::ParseError(
61                "Deflate codec requires deflate feature".to_string(),
62            )),
63            #[cfg(feature = "snappy")]
64            CompressionCodec::Snappy => {
65                // Each compressed block is followed by the 4-byte, big-endian CRC32
66                // checksum of the uncompressed data in the block.
67                let crc = &block[block.len() - 4..];
68                let block = &block[..block.len() - 4];
69
70                let mut decoder = snap::raw::Decoder::new();
71                let decoded = decoder
72                    .decompress_vec(block)
73                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
74
75                let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
76                if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
77                    return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string()));
78                }
79                Ok(decoded)
80            }
81            #[cfg(not(feature = "snappy"))]
82            CompressionCodec::Snappy => Err(ArrowError::ParseError(
83                "Snappy codec requires snappy feature".to_string(),
84            )),
85
86            #[cfg(feature = "zstd")]
87            CompressionCodec::ZStandard => {
88                let mut decoder = zstd::Decoder::new(block)?;
89                let mut out = Vec::new();
90                decoder.read_to_end(&mut out)?;
91                Ok(out)
92            }
93            #[cfg(not(feature = "zstd"))]
94            CompressionCodec::ZStandard => Err(ArrowError::ParseError(
95                "ZStandard codec requires zstd feature".to_string(),
96            )),
97            #[cfg(feature = "bzip2")]
98            CompressionCodec::Bzip2 => {
99                let mut decoder = bzip2::read::BzDecoder::new(block);
100                let mut out = Vec::new();
101                decoder.read_to_end(&mut out)?;
102                Ok(out)
103            }
104            #[cfg(not(feature = "bzip2"))]
105            CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
106                "Bzip2 codec requires bzip2 feature".to_string(),
107            )),
108            #[cfg(feature = "xz")]
109            CompressionCodec::Xz => {
110                let mut decoder = xz::read::XzDecoder::new(block);
111                let mut out = Vec::new();
112                decoder.read_to_end(&mut out)?;
113                Ok(out)
114            }
115            #[cfg(not(feature = "xz"))]
116            CompressionCodec::Xz => Err(ArrowError::ParseError(
117                "XZ codec requires xz feature".to_string(),
118            )),
119        }
120    }
121
122    #[allow(unused_variables)]
123    pub(crate) fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArrowError> {
124        match self {
125            #[cfg(feature = "deflate")]
126            CompressionCodec::Deflate => {
127                let mut encoder =
128                    flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default());
129                encoder.write_all(data)?;
130                let compressed = encoder.finish()?;
131                Ok(compressed)
132            }
133            #[cfg(not(feature = "deflate"))]
134            CompressionCodec::Deflate => Err(ArrowError::ParseError(
135                "Deflate codec requires deflate feature".to_string(),
136            )),
137
138            #[cfg(feature = "snappy")]
139            CompressionCodec::Snappy => {
140                let mut encoder = snap::raw::Encoder::new();
141                // Allocate and compress in one step for efficiency
142                let mut compressed = encoder
143                    .compress_vec(data)
144                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
145                // Compute CRC32 (ISO‑HDLC poly) of **uncompressed** data
146                let crc_val = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(data);
147                compressed.extend_from_slice(&crc_val.to_be_bytes());
148                Ok(compressed)
149            }
150            #[cfg(not(feature = "snappy"))]
151            CompressionCodec::Snappy => Err(ArrowError::ParseError(
152                "Snappy codec requires snappy feature".to_string(),
153            )),
154
155            #[cfg(feature = "zstd")]
156            CompressionCodec::ZStandard => {
157                let mut encoder = zstd::Encoder::new(Vec::new(), 0)
158                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
159                encoder.write_all(data)?;
160                let compressed = encoder
161                    .finish()
162                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
163                Ok(compressed)
164            }
165            #[cfg(not(feature = "zstd"))]
166            CompressionCodec::ZStandard => Err(ArrowError::ParseError(
167                "ZStandard codec requires zstd feature".to_string(),
168            )),
169
170            #[cfg(feature = "bzip2")]
171            CompressionCodec::Bzip2 => {
172                let mut encoder =
173                    bzip2::write::BzEncoder::new(Vec::new(), bzip2::Compression::default());
174                encoder.write_all(data)?;
175                let compressed = encoder.finish()?;
176                Ok(compressed)
177            }
178            #[cfg(not(feature = "bzip2"))]
179            CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
180                "Bzip2 codec requires bzip2 feature".to_string(),
181            )),
182            #[cfg(feature = "xz")]
183            CompressionCodec::Xz => {
184                let mut encoder = xz::write::XzEncoder::new(Vec::new(), 6);
185                encoder.write_all(data)?;
186                let compressed = encoder.finish()?;
187                Ok(compressed)
188            }
189            #[cfg(not(feature = "xz"))]
190            CompressionCodec::Xz => Err(ArrowError::ParseError(
191                "XZ codec requires xz feature".to_string(),
192            )),
193        }
194    }
195}