arrow_avro/
compression.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_schema::ArrowError;
19use std::io;
20use std::io::{Read, Write};
21
22/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
23pub const CODEC_METADATA_KEY: &str = "avro.codec";
24
25#[derive(Debug, Copy, Clone, Eq, PartialEq)]
26/// Supported compression codecs for Avro data
27///
28/// Avro supports multiple compression formats for data blocks.
29/// This enum represents the compression codecs available in this implementation.
30pub enum CompressionCodec {
31    /// Deflate compression (RFC 1951)
32    Deflate,
33    /// Snappy compression
34    Snappy,
35    /// ZStandard compression
36    ZStandard,
37    /// Bzip2 compression
38    Bzip2,
39    /// Xz compression
40    Xz,
41}
42
43impl CompressionCodec {
44    pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, ArrowError> {
45        match self {
46            #[cfg(feature = "deflate")]
47            CompressionCodec::Deflate => {
48                let mut decoder = flate2::read::DeflateDecoder::new(block);
49                let mut out = Vec::new();
50                decoder.read_to_end(&mut out)?;
51                Ok(out)
52            }
53            #[cfg(not(feature = "deflate"))]
54            CompressionCodec::Deflate => Err(ArrowError::ParseError(
55                "Deflate codec requires deflate feature".to_string(),
56            )),
57            #[cfg(feature = "snappy")]
58            CompressionCodec::Snappy => {
59                // Each compressed block is followed by the 4-byte, big-endian CRC32
60                // checksum of the uncompressed data in the block.
61                let crc = &block[block.len() - 4..];
62                let block = &block[..block.len() - 4];
63
64                let mut decoder = snap::raw::Decoder::new();
65                let decoded = decoder
66                    .decompress_vec(block)
67                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
68
69                let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
70                if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
71                    return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string()));
72                }
73                Ok(decoded)
74            }
75            #[cfg(not(feature = "snappy"))]
76            CompressionCodec::Snappy => Err(ArrowError::ParseError(
77                "Snappy codec requires snappy feature".to_string(),
78            )),
79
80            #[cfg(feature = "zstd")]
81            CompressionCodec::ZStandard => {
82                let mut decoder = zstd::Decoder::new(block)?;
83                let mut out = Vec::new();
84                decoder.read_to_end(&mut out)?;
85                Ok(out)
86            }
87            #[cfg(not(feature = "zstd"))]
88            CompressionCodec::ZStandard => Err(ArrowError::ParseError(
89                "ZStandard codec requires zstd feature".to_string(),
90            )),
91            #[cfg(feature = "bzip2")]
92            CompressionCodec::Bzip2 => {
93                let mut decoder = bzip2::read::BzDecoder::new(block);
94                let mut out = Vec::new();
95                decoder.read_to_end(&mut out)?;
96                Ok(out)
97            }
98            #[cfg(not(feature = "bzip2"))]
99            CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
100                "Bzip2 codec requires bzip2 feature".to_string(),
101            )),
102            #[cfg(feature = "xz")]
103            CompressionCodec::Xz => {
104                let mut decoder = xz::read::XzDecoder::new(block);
105                let mut out = Vec::new();
106                decoder.read_to_end(&mut out)?;
107                Ok(out)
108            }
109            #[cfg(not(feature = "xz"))]
110            CompressionCodec::Xz => Err(ArrowError::ParseError(
111                "XZ codec requires xz feature".to_string(),
112            )),
113        }
114    }
115
116    pub(crate) fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArrowError> {
117        match self {
118            #[cfg(feature = "deflate")]
119            CompressionCodec::Deflate => {
120                let mut encoder =
121                    flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default());
122                encoder.write_all(data)?;
123                let compressed = encoder.finish()?;
124                Ok(compressed)
125            }
126            #[cfg(not(feature = "deflate"))]
127            CompressionCodec::Deflate => Err(ArrowError::ParseError(
128                "Deflate codec requires deflate feature".to_string(),
129            )),
130
131            #[cfg(feature = "snappy")]
132            CompressionCodec::Snappy => {
133                let mut encoder = snap::raw::Encoder::new();
134                // Allocate and compress in one step for efficiency
135                let mut compressed = encoder
136                    .compress_vec(data)
137                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
138                // Compute CRC32 (ISO‑HDLC poly) of **uncompressed** data
139                let crc_val = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(data);
140                compressed.extend_from_slice(&crc_val.to_be_bytes());
141                Ok(compressed)
142            }
143            #[cfg(not(feature = "snappy"))]
144            CompressionCodec::Snappy => Err(ArrowError::ParseError(
145                "Snappy codec requires snappy feature".to_string(),
146            )),
147
148            #[cfg(feature = "zstd")]
149            CompressionCodec::ZStandard => {
150                let mut encoder = zstd::Encoder::new(Vec::new(), 0)
151                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
152                encoder.write_all(data)?;
153                let compressed = encoder
154                    .finish()
155                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
156                Ok(compressed)
157            }
158            #[cfg(not(feature = "zstd"))]
159            CompressionCodec::ZStandard => Err(ArrowError::ParseError(
160                "ZStandard codec requires zstd feature".to_string(),
161            )),
162
163            #[cfg(feature = "bzip2")]
164            CompressionCodec::Bzip2 => {
165                let mut encoder =
166                    bzip2::write::BzEncoder::new(Vec::new(), bzip2::Compression::default());
167                encoder.write_all(data)?;
168                let compressed = encoder.finish()?;
169                Ok(compressed)
170            }
171            #[cfg(not(feature = "bzip2"))]
172            CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
173                "Bzip2 codec requires bzip2 feature".to_string(),
174            )),
175            #[cfg(feature = "xz")]
176            CompressionCodec::Xz => {
177                let mut encoder = xz::write::XzEncoder::new(Vec::new(), 6);
178                encoder.write_all(data)?;
179                let compressed = encoder.finish()?;
180                Ok(compressed)
181            }
182            #[cfg(not(feature = "xz"))]
183            CompressionCodec::Xz => Err(ArrowError::ParseError(
184                "XZ codec requires xz feature".to_string(),
185            )),
186        }
187    }
188}