parquet/
compression.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains codec interface and supported codec implementations.
19//!
20//! See [`Compression`](crate::basic::Compression) enum for all available compression
21//! algorithms.
22//!
23#[cfg_attr(
24    feature = "experimental",
25    doc = r##"
26# Example
27
28```no_run
29use parquet::{basic::Compression, compression::{create_codec, CodecOptionsBuilder}};
30
31let codec_options = CodecOptionsBuilder::default()
32    .set_backward_compatible_lz4(false)
33    .build();
34let mut codec = match create_codec(Compression::SNAPPY, &codec_options) {
35 Ok(Some(codec)) => codec,
36 _ => panic!(),
37};
38
39let data = vec![b'p', b'a', b'r', b'q', b'u', b'e', b't'];
40let mut compressed = vec![];
41codec.compress(&data[..], &mut compressed).unwrap();
42
43let mut output = vec![];
44codec.decompress(&compressed[..], &mut output, None).unwrap();
45
46assert_eq!(output, data);
47```
48"##
49)]
50use crate::basic::Compression as CodecType;
51use crate::errors::{ParquetError, Result};
52
53/// Parquet compression codec interface.
54pub trait Codec: Send {
55    /// Compresses data stored in slice `input_buf` and appends the compressed result
56    /// to `output_buf`.
57    ///
58    /// Note that you'll need to call `clear()` before reusing the same `output_buf`
59    /// across different `compress` calls.
60    fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
61
62    /// Decompresses data stored in slice `input_buf` and appends output to `output_buf`.
63    ///
64    /// If the uncompress_size is provided it will allocate the exact amount of memory.
65    /// Otherwise, it will estimate the uncompressed size, allocating an amount of memory
66    /// greater or equal to the real uncompress_size.
67    ///
68    /// Returns the total number of bytes written.
69    fn decompress(
70        &mut self,
71        input_buf: &[u8],
72        output_buf: &mut Vec<u8>,
73        uncompress_size: Option<usize>,
74    ) -> Result<usize>;
75}
76
77/// Struct to hold `Codec` creation options.
78#[derive(Debug, PartialEq, Eq)]
79pub struct CodecOptions {
80    /// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP.
81    backward_compatible_lz4: bool,
82}
83
84impl Default for CodecOptions {
85    fn default() -> Self {
86        CodecOptionsBuilder::default().build()
87    }
88}
89
90pub struct CodecOptionsBuilder {
91    /// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP.
92    backward_compatible_lz4: bool,
93}
94
95impl Default for CodecOptionsBuilder {
96    fn default() -> Self {
97        Self {
98            backward_compatible_lz4: true,
99        }
100    }
101}
102
103impl CodecOptionsBuilder {
104    /// Enable/disable backward compatible LZ4.
105    ///
106    /// If backward compatible LZ4 is enable, on LZ4_HADOOP error it will fallback
107    /// to the older versions LZ4 algorithms. That is LZ4_FRAME, for backward compatibility
108    /// with files generated by older versions of this library, and LZ4_RAW, for backward
109    /// compatibility with files generated by older versions of parquet-cpp.
110    ///
111    /// If backward compatible LZ4 is disabled, on LZ4_HADOOP error it will return the error.
112    pub fn set_backward_compatible_lz4(mut self, value: bool) -> CodecOptionsBuilder {
113        self.backward_compatible_lz4 = value;
114        self
115    }
116
117    pub fn build(self) -> CodecOptions {
118        CodecOptions {
119            backward_compatible_lz4: self.backward_compatible_lz4,
120        }
121    }
122}
123
124/// Defines valid compression levels.
125pub(crate) trait CompressionLevel<T: std::fmt::Display + std::cmp::PartialOrd> {
126    const MINIMUM_LEVEL: T;
127    const MAXIMUM_LEVEL: T;
128
129    /// Tests if the provided compression level is valid.
130    fn is_valid_level(level: T) -> Result<()> {
131        let compression_range = Self::MINIMUM_LEVEL..=Self::MAXIMUM_LEVEL;
132        if compression_range.contains(&level) {
133            Ok(())
134        } else {
135            Err(ParquetError::General(format!(
136                "valid compression range {}..={} exceeded.",
137                compression_range.start(),
138                compression_range.end()
139            )))
140        }
141    }
142}
143
144/// Given the compression type `codec`, returns a codec used to compress and decompress
145/// bytes for the compression type.
146/// This returns `None` if the codec type is `UNCOMPRESSED`.
147pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result<Option<Box<dyn Codec>>> {
148    #[allow(unreachable_code, unused_variables)]
149    match codec {
150        CodecType::BROTLI(level) => {
151            #[cfg(any(feature = "brotli", test))]
152            return Ok(Some(Box::new(BrotliCodec::new(level))));
153            Err(ParquetError::General(
154                "Disabled feature at compile time: brotli".into(),
155            ))
156        }
157        CodecType::GZIP(level) => {
158            #[cfg(any(feature = "flate2", test))]
159            return Ok(Some(Box::new(GZipCodec::new(level))));
160            Err(ParquetError::General(
161                "Disabled feature at compile time: flate2".into(),
162            ))
163        }
164        CodecType::SNAPPY => {
165            #[cfg(any(feature = "snap", test))]
166            return Ok(Some(Box::new(SnappyCodec::new())));
167            Err(ParquetError::General(
168                "Disabled feature at compile time: snap".into(),
169            ))
170        }
171        CodecType::LZ4 => {
172            #[cfg(any(feature = "lz4", test))]
173            return Ok(Some(Box::new(LZ4HadoopCodec::new(
174                _options.backward_compatible_lz4,
175            ))));
176            Err(ParquetError::General(
177                "Disabled feature at compile time: lz4".into(),
178            ))
179        }
180        CodecType::ZSTD(level) => {
181            #[cfg(any(feature = "zstd", test))]
182            return Ok(Some(Box::new(ZSTDCodec::new(level))));
183            Err(ParquetError::General(
184                "Disabled feature at compile time: zstd".into(),
185            ))
186        }
187        CodecType::LZ4_RAW => {
188            #[cfg(any(feature = "lz4", test))]
189            return Ok(Some(Box::new(LZ4RawCodec::new())));
190            Err(ParquetError::General(
191                "Disabled feature at compile time: lz4".into(),
192            ))
193        }
194        CodecType::UNCOMPRESSED => Ok(None),
195        _ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
196    }
197}
198
199#[cfg(any(feature = "snap", test))]
200mod snappy_codec {
201    use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder};
202
203    use crate::compression::Codec;
204    use crate::errors::Result;
205
206    /// Codec for Snappy compression format.
207    pub struct SnappyCodec {
208        decoder: Decoder,
209        encoder: Encoder,
210    }
211
212    impl SnappyCodec {
213        /// Creates new Snappy compression codec.
214        pub(crate) fn new() -> Self {
215            Self {
216                decoder: Decoder::new(),
217                encoder: Encoder::new(),
218            }
219        }
220    }
221
222    impl Codec for SnappyCodec {
223        fn decompress(
224            &mut self,
225            input_buf: &[u8],
226            output_buf: &mut Vec<u8>,
227            uncompress_size: Option<usize>,
228        ) -> Result<usize> {
229            let len = match uncompress_size {
230                Some(size) => size,
231                None => decompress_len(input_buf)?,
232            };
233            let offset = output_buf.len();
234            output_buf.resize(offset + len, 0);
235            self.decoder
236                .decompress(input_buf, &mut output_buf[offset..])
237                .map_err(|e| e.into())
238        }
239
240        fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
241            let output_buf_len = output_buf.len();
242            let required_len = max_compress_len(input_buf.len());
243            output_buf.resize(output_buf_len + required_len, 0);
244            let n = self
245                .encoder
246                .compress(input_buf, &mut output_buf[output_buf_len..])?;
247            output_buf.truncate(output_buf_len + n);
248            Ok(())
249        }
250    }
251}
252#[cfg(any(feature = "snap", test))]
253pub use snappy_codec::*;
254
255#[cfg(any(feature = "flate2", test))]
256mod gzip_codec {
257
258    use std::io::{Read, Write};
259
260    use flate2::{read, write, Compression};
261
262    use crate::compression::Codec;
263    use crate::errors::Result;
264
265    use super::GzipLevel;
266
267    /// Codec for GZIP compression algorithm.
268    pub struct GZipCodec {
269        level: GzipLevel,
270    }
271
272    impl GZipCodec {
273        /// Creates new GZIP compression codec.
274        pub(crate) fn new(level: GzipLevel) -> Self {
275            Self { level }
276        }
277    }
278
279    impl Codec for GZipCodec {
280        fn decompress(
281            &mut self,
282            input_buf: &[u8],
283            output_buf: &mut Vec<u8>,
284            _uncompress_size: Option<usize>,
285        ) -> Result<usize> {
286            let mut decoder = read::MultiGzDecoder::new(input_buf);
287            decoder.read_to_end(output_buf).map_err(|e| e.into())
288        }
289
290        fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
291            let mut encoder = write::GzEncoder::new(output_buf, Compression::new(self.level.0));
292            encoder.write_all(input_buf)?;
293            encoder.try_finish().map_err(|e| e.into())
294        }
295    }
296}
297#[cfg(any(feature = "flate2", test))]
298pub use gzip_codec::*;
299
300/// Represents a valid gzip compression level.
301///
302/// Defaults to 6.
303///
304/// * 0: least compression
305/// * 9: most compression (that other software can read)
306/// * 10: most compression (incompatible with other software, see below)
307/// #### WARNING:
308/// Level 10 compression can offer smallest file size,
309/// but Parquet files created with it will not be readable
310/// by other "standard" paquet readers.
311///
312/// Do **NOT** use level 10 if you need other software to
313/// be able to read the files. Read below for details.
314///
315/// ### IMPORTANT:
316/// There's often confusion about the compression levels in `flate2` vs `arrow`
317/// as highlighted in issue [#1011](https://github.com/apache/arrow-rs/issues/6282).
318///
319/// `flate2` supports two compression backends: `miniz_oxide` and `zlib`.
320///
321/// - `zlib` supports levels from 0 to 9.
322/// - `miniz_oxide` supports levels from 0 to 10.
323///
324/// `arrow` uses `flate` with `rust_backend` feature,
325/// which provides `miniz_oxide` as the backend.
326/// Therefore 0-10 levels are supported.
327///
328/// `flate2` documents this behavior properly with
329/// [this commit](https://github.com/rust-lang/flate2-rs/pull/430).
330#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
331pub struct GzipLevel(u32);
332
333impl Default for GzipLevel {
334    fn default() -> Self {
335        // The default as of miniz_oxide 0.5.1 is 6 for compression level
336        // (miniz_oxide::deflate::CompressionLevel::DefaultLevel)
337        Self(6)
338    }
339}
340
341impl CompressionLevel<u32> for GzipLevel {
342    const MINIMUM_LEVEL: u32 = 0;
343    const MAXIMUM_LEVEL: u32 = 10;
344}
345
346impl GzipLevel {
347    /// Attempts to create a gzip compression level.
348    ///
349    /// Compression levels must be valid (i.e. be acceptable for [`flate2::Compression`]).
350    pub fn try_new(level: u32) -> Result<Self> {
351        Self::is_valid_level(level).map(|_| Self(level))
352    }
353
354    /// Returns the compression level.
355    pub fn compression_level(&self) -> u32 {
356        self.0
357    }
358}
359
360#[cfg(any(feature = "brotli", test))]
361mod brotli_codec {
362
363    use std::io::{Read, Write};
364
365    use crate::compression::Codec;
366    use crate::errors::Result;
367
368    use super::BrotliLevel;
369
370    const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
371    const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22
372
373    /// Codec for Brotli compression algorithm.
374    pub struct BrotliCodec {
375        level: BrotliLevel,
376    }
377
378    impl BrotliCodec {
379        /// Creates new Brotli compression codec.
380        pub(crate) fn new(level: BrotliLevel) -> Self {
381            Self { level }
382        }
383    }
384
385    impl Codec for BrotliCodec {
386        fn decompress(
387            &mut self,
388            input_buf: &[u8],
389            output_buf: &mut Vec<u8>,
390            uncompress_size: Option<usize>,
391        ) -> Result<usize> {
392            let buffer_size = uncompress_size.unwrap_or(BROTLI_DEFAULT_BUFFER_SIZE);
393            brotli::Decompressor::new(input_buf, buffer_size)
394                .read_to_end(output_buf)
395                .map_err(|e| e.into())
396        }
397
398        fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
399            let mut encoder = brotli::CompressorWriter::new(
400                output_buf,
401                BROTLI_DEFAULT_BUFFER_SIZE,
402                self.level.0,
403                BROTLI_DEFAULT_LG_WINDOW_SIZE,
404            );
405            encoder.write_all(input_buf)?;
406            encoder.flush().map_err(|e| e.into())
407        }
408    }
409}
410#[cfg(any(feature = "brotli", test))]
411pub use brotli_codec::*;
412
413/// Represents a valid brotli compression level.
414#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
415pub struct BrotliLevel(u32);
416
417impl Default for BrotliLevel {
418    fn default() -> Self {
419        Self(1)
420    }
421}
422
423impl CompressionLevel<u32> for BrotliLevel {
424    const MINIMUM_LEVEL: u32 = 0;
425    const MAXIMUM_LEVEL: u32 = 11;
426}
427
428impl BrotliLevel {
429    /// Attempts to create a brotli compression level.
430    ///
431    /// Compression levels must be valid.
432    pub fn try_new(level: u32) -> Result<Self> {
433        Self::is_valid_level(level).map(|_| Self(level))
434    }
435
436    /// Returns the compression level.
437    pub fn compression_level(&self) -> u32 {
438        self.0
439    }
440}
441
442#[cfg(any(feature = "lz4", test))]
443mod lz4_codec {
444    use std::io::{Read, Write};
445
446    use crate::compression::Codec;
447    use crate::errors::{ParquetError, Result};
448
449    const LZ4_BUFFER_SIZE: usize = 4096;
450
451    /// Codec for LZ4 compression algorithm.
452    pub struct LZ4Codec {}
453
454    impl LZ4Codec {
455        /// Creates new LZ4 compression codec.
456        pub(crate) fn new() -> Self {
457            Self {}
458        }
459    }
460
461    impl Codec for LZ4Codec {
462        fn decompress(
463            &mut self,
464            input_buf: &[u8],
465            output_buf: &mut Vec<u8>,
466            _uncompress_size: Option<usize>,
467        ) -> Result<usize> {
468            let mut decoder = lz4_flex::frame::FrameDecoder::new(input_buf);
469            let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
470            let mut total_len = 0;
471            loop {
472                let len = decoder.read(&mut buffer)?;
473                if len == 0 {
474                    break;
475                }
476                total_len += len;
477                output_buf.write_all(&buffer[0..len])?;
478            }
479            Ok(total_len)
480        }
481
482        fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
483            let mut encoder = lz4_flex::frame::FrameEncoder::new(output_buf);
484            let mut from = 0;
485            loop {
486                let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
487                encoder.write_all(&input_buf[from..to])?;
488                from += LZ4_BUFFER_SIZE;
489                if from >= input_buf.len() {
490                    break;
491                }
492            }
493            match encoder.finish() {
494                Ok(_) => Ok(()),
495                Err(e) => Err(ParquetError::External(Box::new(e))),
496            }
497        }
498    }
499}
500
501#[cfg(all(feature = "experimental", any(feature = "lz4", test)))]
502pub use lz4_codec::*;
503
504#[cfg(any(feature = "zstd", test))]
505mod zstd_codec {
506    use std::io::{self, Write};
507
508    use crate::compression::{Codec, ZstdLevel};
509    use crate::errors::Result;
510
511    /// Codec for Zstandard compression algorithm.
512    pub struct ZSTDCodec {
513        level: ZstdLevel,
514    }
515
516    impl ZSTDCodec {
517        /// Creates new Zstandard compression codec.
518        pub(crate) fn new(level: ZstdLevel) -> Self {
519            Self { level }
520        }
521    }
522
523    impl Codec for ZSTDCodec {
524        fn decompress(
525            &mut self,
526            input_buf: &[u8],
527            output_buf: &mut Vec<u8>,
528            _uncompress_size: Option<usize>,
529        ) -> Result<usize> {
530            let mut decoder = zstd::Decoder::new(input_buf)?;
531            match io::copy(&mut decoder, output_buf) {
532                Ok(n) => Ok(n as usize),
533                Err(e) => Err(e.into()),
534            }
535        }
536
537        fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
538            let mut encoder = zstd::Encoder::new(output_buf, self.level.0)?;
539            encoder.write_all(input_buf)?;
540            match encoder.finish() {
541                Ok(_) => Ok(()),
542                Err(e) => Err(e.into()),
543            }
544        }
545    }
546}
547#[cfg(any(feature = "zstd", test))]
548pub use zstd_codec::*;
549
550/// Represents a valid zstd compression level.
551#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
552pub struct ZstdLevel(i32);
553
554impl CompressionLevel<i32> for ZstdLevel {
555    // zstd binds to C, and hence zstd::compression_level_range() is not const as this calls the
556    // underlying C library.
557    const MINIMUM_LEVEL: i32 = 1;
558    const MAXIMUM_LEVEL: i32 = 22;
559}
560
561impl ZstdLevel {
562    /// Attempts to create a zstd compression level from a given compression level.
563    ///
564    /// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`]).
565    pub fn try_new(level: i32) -> Result<Self> {
566        Self::is_valid_level(level).map(|_| Self(level))
567    }
568
569    /// Returns the compression level.
570    pub fn compression_level(&self) -> i32 {
571        self.0
572    }
573}
574
575impl Default for ZstdLevel {
576    fn default() -> Self {
577        Self(1)
578    }
579}
580
581#[cfg(any(feature = "lz4", test))]
582mod lz4_raw_codec {
583    use crate::compression::Codec;
584    use crate::errors::ParquetError;
585    use crate::errors::Result;
586
587    /// Codec for LZ4 Raw compression algorithm.
588    pub struct LZ4RawCodec {}
589
590    impl LZ4RawCodec {
591        /// Creates new LZ4 Raw compression codec.
592        pub(crate) fn new() -> Self {
593            Self {}
594        }
595    }
596
597    impl Codec for LZ4RawCodec {
598        fn decompress(
599            &mut self,
600            input_buf: &[u8],
601            output_buf: &mut Vec<u8>,
602            uncompress_size: Option<usize>,
603        ) -> Result<usize> {
604            let offset = output_buf.len();
605            let required_len = match uncompress_size {
606                Some(uncompress_size) => uncompress_size,
607                None => {
608                    return Err(ParquetError::General(
609                        "LZ4RawCodec unsupported without uncompress_size".into(),
610                    ))
611                }
612            };
613            output_buf.resize(offset + required_len, 0);
614            match lz4_flex::block::decompress_into(input_buf, &mut output_buf[offset..]) {
615                Ok(n) => {
616                    if n != required_len {
617                        return Err(ParquetError::General(
618                            "LZ4RawCodec uncompress_size is not the expected one".into(),
619                        ));
620                    }
621                    Ok(n)
622                }
623                Err(e) => Err(ParquetError::External(Box::new(e))),
624            }
625        }
626
627        fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
628            let offset = output_buf.len();
629            let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len());
630            output_buf.resize(offset + required_len, 0);
631            match lz4_flex::block::compress_into(input_buf, &mut output_buf[offset..]) {
632                Ok(n) => {
633                    output_buf.truncate(offset + n);
634                    Ok(())
635                }
636                Err(e) => Err(ParquetError::External(Box::new(e))),
637            }
638        }
639    }
640}
641#[cfg(any(feature = "lz4", test))]
642pub use lz4_raw_codec::*;
643
644#[cfg(any(feature = "lz4", test))]
645mod lz4_hadoop_codec {
646    use crate::compression::lz4_codec::LZ4Codec;
647    use crate::compression::lz4_raw_codec::LZ4RawCodec;
648    use crate::compression::Codec;
649    use crate::errors::{ParquetError, Result};
650    use std::io;
651
652    /// Size of u32 type.
653    const SIZE_U32: usize = std::mem::size_of::<u32>();
654
655    /// Length of the LZ4_HADOOP prefix.
656    const PREFIX_LEN: usize = SIZE_U32 * 2;
657
658    /// Codec for LZ4 Hadoop compression algorithm.
659    pub struct LZ4HadoopCodec {
660        /// Whether or not to fallback to other LZ4 implementations on error.
661        /// Fallback is done to be backward compatible with older versions of this
662        /// library and older versions parquet-cpp.
663        backward_compatible_lz4: bool,
664    }
665
666    impl LZ4HadoopCodec {
667        /// Creates new LZ4 Hadoop compression codec.
668        pub(crate) fn new(backward_compatible_lz4: bool) -> Self {
669            Self {
670                backward_compatible_lz4,
671            }
672        }
673    }
674
675    /// Try to decompress the buffer as if it was compressed with the Hadoop Lz4Codec.
676    /// Adapted from pola-rs [compression.rs:try_decompress_hadoop](https://pola-rs.github.io/polars/src/parquet2/compression.rs.html#225)
677    /// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474).
678    /// Returns error if decompression failed.
679    fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> io::Result<usize> {
680        // Parquet files written with the Hadoop Lz4Codec use their own framing.
681        // The input buffer can contain an arbitrary number of "frames", each
682        // with the following structure:
683        // - bytes 0..3: big-endian uint32_t representing the frame decompressed size
684        // - bytes 4..7: big-endian uint32_t representing the frame compressed size
685        // - bytes 8...: frame compressed data
686        //
687        // The Hadoop Lz4Codec source code can be found here:
688        // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
689        let mut input_len = input_buf.len();
690        let mut input = input_buf;
691        let mut read_bytes = 0;
692        let mut output_len = output_buf.len();
693        let mut output: &mut [u8] = output_buf;
694        while input_len >= PREFIX_LEN {
695            let mut bytes = [0; SIZE_U32];
696            bytes.copy_from_slice(&input[0..4]);
697            let expected_decompressed_size = u32::from_be_bytes(bytes);
698            let mut bytes = [0; SIZE_U32];
699            bytes.copy_from_slice(&input[4..8]);
700            let expected_compressed_size = u32::from_be_bytes(bytes);
701            input = &input[PREFIX_LEN..];
702            input_len -= PREFIX_LEN;
703
704            if input_len < expected_compressed_size as usize {
705                return Err(io::Error::new(
706                    io::ErrorKind::Other,
707                    "Not enough bytes for Hadoop frame",
708                ));
709            }
710
711            if output_len < expected_decompressed_size as usize {
712                return Err(io::Error::new(
713                    io::ErrorKind::Other,
714                    "Not enough bytes to hold advertised output",
715                ));
716            }
717            let decompressed_size =
718                lz4_flex::decompress_into(&input[..expected_compressed_size as usize], output)
719                    .map_err(|e| ParquetError::External(Box::new(e)))?;
720            if decompressed_size != expected_decompressed_size as usize {
721                return Err(io::Error::new(
722                    io::ErrorKind::Other,
723                    "Unexpected decompressed size",
724                ));
725            }
726            input_len -= expected_compressed_size as usize;
727            output_len -= expected_decompressed_size as usize;
728            read_bytes += expected_decompressed_size as usize;
729            if input_len > expected_compressed_size as usize {
730                input = &input[expected_compressed_size as usize..];
731                output = &mut output[expected_decompressed_size as usize..];
732            } else {
733                break;
734            }
735        }
736        if input_len == 0 {
737            Ok(read_bytes)
738        } else {
739            Err(io::Error::new(
740                io::ErrorKind::Other,
741                "Not all input are consumed",
742            ))
743        }
744    }
745
746    impl Codec for LZ4HadoopCodec {
747        fn decompress(
748            &mut self,
749            input_buf: &[u8],
750            output_buf: &mut Vec<u8>,
751            uncompress_size: Option<usize>,
752        ) -> Result<usize> {
753            let output_len = output_buf.len();
754            let required_len = match uncompress_size {
755                Some(n) => n,
756                None => {
757                    return Err(ParquetError::General(
758                        "LZ4HadoopCodec unsupported without uncompress_size".into(),
759                    ))
760                }
761            };
762            output_buf.resize(output_len + required_len, 0);
763            match try_decompress_hadoop(input_buf, &mut output_buf[output_len..]) {
764                Ok(n) => {
765                    if n != required_len {
766                        return Err(ParquetError::General(
767                            "LZ4HadoopCodec uncompress_size is not the expected one".into(),
768                        ));
769                    }
770                    Ok(n)
771                }
772                Err(e) if !self.backward_compatible_lz4 => Err(e.into()),
773                // Fallback done to be backward compatible with older versions of this
774                // library and older versions of parquet-cpp.
775                Err(_) => {
776                    // Truncate any inserted element before tryingg next algorithm.
777                    output_buf.truncate(output_len);
778                    match LZ4Codec::new().decompress(input_buf, output_buf, uncompress_size) {
779                        Ok(n) => Ok(n),
780                        Err(_) => {
781                            // Truncate any inserted element before tryingg next algorithm.
782                            output_buf.truncate(output_len);
783                            LZ4RawCodec::new().decompress(input_buf, output_buf, uncompress_size)
784                        }
785                    }
786                }
787            }
788        }
789
790        fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
791            // Allocate memory to store the LZ4_HADOOP prefix.
792            let offset = output_buf.len();
793            output_buf.resize(offset + PREFIX_LEN, 0);
794
795            // Append LZ4_RAW compressed bytes after prefix.
796            LZ4RawCodec::new().compress(input_buf, output_buf)?;
797
798            // Prepend decompressed size and compressed size in big endian to be compatible
799            // with LZ4_HADOOP.
800            let output_buf = &mut output_buf[offset..];
801            let compressed_size = output_buf.len() - PREFIX_LEN;
802            let compressed_size = compressed_size as u32;
803            let uncompressed_size = input_buf.len() as u32;
804            output_buf[..SIZE_U32].copy_from_slice(&uncompressed_size.to_be_bytes());
805            output_buf[SIZE_U32..PREFIX_LEN].copy_from_slice(&compressed_size.to_be_bytes());
806
807            Ok(())
808        }
809    }
810}
811#[cfg(any(feature = "lz4", test))]
812pub use lz4_hadoop_codec::*;
813
814#[cfg(test)]
815mod tests {
816    use super::*;
817
818    use crate::util::test_common::rand_gen::random_bytes;
819
820    fn test_roundtrip(c: CodecType, data: &[u8], uncompress_size: Option<usize>) {
821        let codec_options = CodecOptionsBuilder::default()
822            .set_backward_compatible_lz4(false)
823            .build();
824        let mut c1 = create_codec(c, &codec_options).unwrap().unwrap();
825        let mut c2 = create_codec(c, &codec_options).unwrap().unwrap();
826
827        // Compress with c1
828        let mut compressed = Vec::new();
829        let mut decompressed = Vec::new();
830        c1.compress(data, &mut compressed)
831            .expect("Error when compressing");
832
833        // Decompress with c2
834        let decompressed_size = c2
835            .decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
836            .expect("Error when decompressing");
837        assert_eq!(data.len(), decompressed_size);
838        assert_eq!(data, decompressed.as_slice());
839
840        decompressed.clear();
841        compressed.clear();
842
843        // Compress with c2
844        c2.compress(data, &mut compressed)
845            .expect("Error when compressing");
846
847        // Decompress with c1
848        let decompressed_size = c1
849            .decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
850            .expect("Error when decompressing");
851        assert_eq!(data.len(), decompressed_size);
852        assert_eq!(data, decompressed.as_slice());
853
854        decompressed.clear();
855        compressed.clear();
856
857        // Test does not trample existing data in output buffers
858        let prefix = &[0xDE, 0xAD, 0xBE, 0xEF];
859        decompressed.extend_from_slice(prefix);
860        compressed.extend_from_slice(prefix);
861
862        c2.compress(data, &mut compressed)
863            .expect("Error when compressing");
864
865        assert_eq!(&compressed[..4], prefix);
866
867        let decompressed_size = c2
868            .decompress(&compressed[4..], &mut decompressed, uncompress_size)
869            .expect("Error when decompressing");
870
871        assert_eq!(data.len(), decompressed_size);
872        assert_eq!(data, &decompressed[4..]);
873        assert_eq!(&decompressed[..4], prefix);
874    }
875
876    fn test_codec_with_size(c: CodecType) {
877        let sizes = vec![100, 10000, 100000];
878        for size in sizes {
879            let data = random_bytes(size);
880            test_roundtrip(c, &data, Some(data.len()));
881        }
882    }
883
884    fn test_codec_without_size(c: CodecType) {
885        let sizes = vec![100, 10000, 100000];
886        for size in sizes {
887            let data = random_bytes(size);
888            test_roundtrip(c, &data, None);
889        }
890    }
891
892    #[test]
893    fn test_codec_snappy() {
894        test_codec_with_size(CodecType::SNAPPY);
895        test_codec_without_size(CodecType::SNAPPY);
896    }
897
898    #[test]
899    fn test_codec_gzip() {
900        for level in GzipLevel::MINIMUM_LEVEL..=GzipLevel::MAXIMUM_LEVEL {
901            let level = GzipLevel::try_new(level).unwrap();
902            test_codec_with_size(CodecType::GZIP(level));
903            test_codec_without_size(CodecType::GZIP(level));
904        }
905    }
906
907    #[test]
908    fn test_codec_brotli() {
909        for level in BrotliLevel::MINIMUM_LEVEL..=BrotliLevel::MAXIMUM_LEVEL {
910            let level = BrotliLevel::try_new(level).unwrap();
911            test_codec_with_size(CodecType::BROTLI(level));
912            test_codec_without_size(CodecType::BROTLI(level));
913        }
914    }
915
916    #[test]
917    fn test_codec_lz4() {
918        test_codec_with_size(CodecType::LZ4);
919    }
920
921    #[test]
922    fn test_codec_zstd() {
923        for level in ZstdLevel::MINIMUM_LEVEL..=ZstdLevel::MAXIMUM_LEVEL {
924            let level = ZstdLevel::try_new(level).unwrap();
925            test_codec_with_size(CodecType::ZSTD(level));
926            test_codec_without_size(CodecType::ZSTD(level));
927        }
928    }
929
930    #[test]
931    fn test_codec_lz4_raw() {
932        test_codec_with_size(CodecType::LZ4_RAW);
933    }
934}