Skip to main content

arrow_avro/
compression.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::AvroError;
19use arrow_schema::ArrowError;
20#[cfg(any(
21    feature = "deflate",
22    feature = "zstd",
23    feature = "bzip2",
24    feature = "xz"
25))]
26use std::io::{Read, Write};
27
28/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
29pub const CODEC_METADATA_KEY: &str = "avro.codec";
30
31#[derive(Debug, Copy, Clone, Eq, PartialEq)]
32/// Supported compression codecs for Avro data
33///
34/// Avro supports multiple compression formats for data blocks.
35/// This enum represents the compression codecs available in this implementation.
36pub enum CompressionCodec {
37    /// Deflate compression (RFC 1951)
38    Deflate,
39    /// Snappy compression
40    Snappy,
41    /// ZStandard compression
42    ZStandard,
43    /// Bzip2 compression
44    Bzip2,
45    /// Xz compression
46    Xz,
47}
48
49impl CompressionCodec {
50    #[allow(unused_variables)]
51    pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, AvroError> {
52        match self {
53            #[cfg(feature = "deflate")]
54            CompressionCodec::Deflate => {
55                let mut decoder = flate2::read::DeflateDecoder::new(block);
56                let mut out = Vec::new();
57                decoder.read_to_end(&mut out)?;
58                Ok(out)
59            }
60            #[cfg(not(feature = "deflate"))]
61            CompressionCodec::Deflate => Err(AvroError::ParseError(
62                "Deflate codec requires deflate feature".to_string(),
63            )),
64            #[cfg(feature = "snappy")]
65            CompressionCodec::Snappy => {
66                // Each compressed block is followed by the 4-byte, big-endian CRC32
67                // checksum of the uncompressed data in the block.
68                let crc = &block[block.len() - 4..];
69                let block = &block[..block.len() - 4];
70
71                let mut decoder = snap::raw::Decoder::new();
72                let decoded = decoder
73                    .decompress_vec(block)
74                    .map_err(|e| AvroError::External(Box::new(e)))?;
75
76                let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
77                if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
78                    return Err(AvroError::ParseError("Snappy CRC mismatch".to_string()));
79                }
80                Ok(decoded)
81            }
82            #[cfg(not(feature = "snappy"))]
83            CompressionCodec::Snappy => Err(AvroError::ParseError(
84                "Snappy codec requires snappy feature".to_string(),
85            )),
86
87            #[cfg(feature = "zstd")]
88            CompressionCodec::ZStandard => {
89                let mut decoder = zstd::Decoder::new(block)?;
90                let mut out = Vec::new();
91                decoder
92                    .read_to_end(&mut out)
93                    .map_err(|e| AvroError::External(Box::new(e)))?;
94                Ok(out)
95            }
96            #[cfg(not(feature = "zstd"))]
97            CompressionCodec::ZStandard => Err(AvroError::ParseError(
98                "ZStandard codec requires zstd feature".to_string(),
99            )),
100            #[cfg(feature = "bzip2")]
101            CompressionCodec::Bzip2 => {
102                let mut decoder = bzip2::read::BzDecoder::new(block);
103                let mut out = Vec::new();
104                decoder
105                    .read_to_end(&mut out)
106                    .map_err(|e| AvroError::External(Box::new(e)))?;
107                Ok(out)
108            }
109            #[cfg(not(feature = "bzip2"))]
110            CompressionCodec::Bzip2 => Err(AvroError::ParseError(
111                "Bzip2 codec requires bzip2 feature".to_string(),
112            )),
113            #[cfg(feature = "xz")]
114            CompressionCodec::Xz => {
115                let mut decoder = xz::read::XzDecoder::new(block);
116                let mut out = Vec::new();
117                decoder
118                    .read_to_end(&mut out)
119                    .map_err(|e| AvroError::External(Box::new(e)))?;
120                Ok(out)
121            }
122            #[cfg(not(feature = "xz"))]
123            CompressionCodec::Xz => Err(AvroError::ParseError(
124                "XZ codec requires xz feature".to_string(),
125            )),
126        }
127    }
128
129    #[allow(unused_variables)]
130    pub(crate) fn compress(&self, data: &[u8]) -> Result<Vec<u8>, ArrowError> {
131        match self {
132            #[cfg(feature = "deflate")]
133            CompressionCodec::Deflate => {
134                let mut encoder =
135                    flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default());
136                encoder.write_all(data)?;
137                let compressed = encoder.finish()?;
138                Ok(compressed)
139            }
140            #[cfg(not(feature = "deflate"))]
141            CompressionCodec::Deflate => Err(ArrowError::ParseError(
142                "Deflate codec requires deflate feature".to_string(),
143            )),
144
145            #[cfg(feature = "snappy")]
146            CompressionCodec::Snappy => {
147                let mut encoder = snap::raw::Encoder::new();
148                // Allocate and compress in one step for efficiency
149                let mut compressed = encoder
150                    .compress_vec(data)
151                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
152                // Compute CRC32 (ISO‑HDLC poly) of **uncompressed** data
153                let crc_val = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(data);
154                compressed.extend_from_slice(&crc_val.to_be_bytes());
155                Ok(compressed)
156            }
157            #[cfg(not(feature = "snappy"))]
158            CompressionCodec::Snappy => Err(ArrowError::ParseError(
159                "Snappy codec requires snappy feature".to_string(),
160            )),
161
162            #[cfg(feature = "zstd")]
163            CompressionCodec::ZStandard => {
164                let mut encoder = zstd::Encoder::new(Vec::new(), 0)
165                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
166                encoder.write_all(data)?;
167                let compressed = encoder
168                    .finish()
169                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
170                Ok(compressed)
171            }
172            #[cfg(not(feature = "zstd"))]
173            CompressionCodec::ZStandard => Err(ArrowError::ParseError(
174                "ZStandard codec requires zstd feature".to_string(),
175            )),
176
177            #[cfg(feature = "bzip2")]
178            CompressionCodec::Bzip2 => {
179                let mut encoder =
180                    bzip2::write::BzEncoder::new(Vec::new(), bzip2::Compression::default());
181                encoder.write_all(data)?;
182                let compressed = encoder.finish()?;
183                Ok(compressed)
184            }
185            #[cfg(not(feature = "bzip2"))]
186            CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
187                "Bzip2 codec requires bzip2 feature".to_string(),
188            )),
189            #[cfg(feature = "xz")]
190            CompressionCodec::Xz => {
191                let mut encoder = xz::write::XzEncoder::new(Vec::new(), 6);
192                encoder.write_all(data)?;
193                let compressed = encoder.finish()?;
194                Ok(compressed)
195            }
196            #[cfg(not(feature = "xz"))]
197            CompressionCodec::Xz => Err(ArrowError::ParseError(
198                "XZ codec requires xz feature".to_string(),
199            )),
200        }
201    }
202}