arrow_avro/compression.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_schema::ArrowError;
19use std::io;
20use std::io::Read;
21
22/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
23pub const CODEC_METADATA_KEY: &str = "avro.codec";
24
25#[derive(Debug, Copy, Clone, Eq, PartialEq)]
26/// Supported compression codecs for Avro data
27///
28/// Avro supports multiple compression formats for data blocks.
29/// This enum represents the compression codecs available in this implementation.
30pub enum CompressionCodec {
31 /// Deflate compression (RFC 1951)
32 Deflate,
33 /// Snappy compression
34 Snappy,
35 /// ZStandard compression
36 ZStandard,
37}
38
39impl CompressionCodec {
40 pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, ArrowError> {
41 match self {
42 #[cfg(feature = "deflate")]
43 CompressionCodec::Deflate => {
44 let mut decoder = flate2::read::DeflateDecoder::new(block);
45 let mut out = Vec::new();
46 decoder.read_to_end(&mut out)?;
47 Ok(out)
48 }
49 #[cfg(not(feature = "deflate"))]
50 CompressionCodec::Deflate => Err(ArrowError::ParseError(
51 "Deflate codec requires deflate feature".to_string(),
52 )),
53 #[cfg(feature = "snappy")]
54 CompressionCodec::Snappy => {
55 // Each compressed block is followed by the 4-byte, big-endian CRC32
56 // checksum of the uncompressed data in the block.
57 let crc = &block[block.len() - 4..];
58 let block = &block[..block.len() - 4];
59
60 let mut decoder = snap::raw::Decoder::new();
61 let decoded = decoder
62 .decompress_vec(block)
63 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
64
65 let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
66 if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
67 return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string()));
68 }
69 Ok(decoded)
70 }
71 #[cfg(not(feature = "snappy"))]
72 CompressionCodec::Snappy => Err(ArrowError::ParseError(
73 "Snappy codec requires snappy feature".to_string(),
74 )),
75
76 #[cfg(feature = "zstd")]
77 CompressionCodec::ZStandard => {
78 let mut decoder = zstd::Decoder::new(block)?;
79 let mut out = Vec::new();
80 decoder.read_to_end(&mut out)?;
81 Ok(out)
82 }
83 #[cfg(not(feature = "zstd"))]
84 CompressionCodec::ZStandard => Err(ArrowError::ParseError(
85 "ZStandard codec requires zstd feature".to_string(),
86 )),
87 }
88 }
89}