parquet/arrow/decoder/
delta_byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19
20use crate::data_type::Int32Type;
21use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
22use crate::errors::{ParquetError, Result};
23
24/// Decoder for `Encoding::DELTA_BYTE_ARRAY`
25pub struct DeltaByteArrayDecoder {
26    prefix_lengths: Vec<i32>,
27    suffix_lengths: Vec<i32>,
28    data: Bytes,
29    length_offset: usize,
30    data_offset: usize,
31    last_value: Vec<u8>,
32}
33
34impl DeltaByteArrayDecoder {
35    /// Create a new [`DeltaByteArrayDecoder`] with the provided data page
36    pub fn new(data: Bytes) -> Result<Self> {
37        let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
38        prefix.set_data(data.clone(), 0)?;
39
40        let num_prefix = prefix.values_left();
41        let mut prefix_lengths = vec![0; num_prefix];
42        assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);
43
44        let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
45        suffix.set_data(data.slice(prefix.get_offset()..), 0)?;
46
47        let num_suffix = suffix.values_left();
48        let mut suffix_lengths = vec![0; num_suffix];
49        assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);
50
51        if num_prefix != num_suffix {
52            return Err(general_err!(format!(
53                "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {num_prefix}, suffixes: {num_suffix}"
54            )));
55        }
56
57        assert_eq!(prefix_lengths.len(), suffix_lengths.len());
58
59        Ok(Self {
60            prefix_lengths,
61            suffix_lengths,
62            data,
63            length_offset: 0,
64            data_offset: prefix.get_offset() + suffix.get_offset(),
65            last_value: vec![],
66        })
67    }
68
69    /// Returns the number of values remaining
70    pub fn remaining(&self) -> usize {
71        self.prefix_lengths.len() - self.length_offset
72    }
73
74    /// Read up to `len` values, returning the number of values read
75    /// and calling `f` with each decoded byte slice
76    ///
77    /// Will short-circuit and return on error
78    pub fn read<F>(&mut self, len: usize, mut f: F) -> Result<usize>
79    where
80        F: FnMut(&[u8]) -> Result<()>,
81    {
82        let to_read = len.min(self.remaining());
83
84        let length_range = self.length_offset..self.length_offset + to_read;
85        let iter = self.prefix_lengths[length_range.clone()]
86            .iter()
87            .zip(&self.suffix_lengths[length_range]);
88
89        let data = self.data.as_ref();
90
91        for (prefix_length, suffix_length) in iter {
92            let prefix_length = *prefix_length as usize;
93            let suffix_length = *suffix_length as usize;
94
95            if self.data_offset + suffix_length > self.data.len() {
96                return Err(ParquetError::EOF("eof decoding byte array".into()));
97            }
98
99            self.last_value.truncate(prefix_length);
100            self.last_value
101                .extend_from_slice(&data[self.data_offset..self.data_offset + suffix_length]);
102            f(&self.last_value)?;
103
104            self.data_offset += suffix_length;
105        }
106
107        self.length_offset += to_read;
108        Ok(to_read)
109    }
110
111    /// Skip up to `to_skip` values, returning the number of values skipped
112    pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
113        let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);
114
115        let length_range = self.length_offset..self.length_offset + to_skip;
116        let iter = self.prefix_lengths[length_range.clone()]
117            .iter()
118            .zip(&self.suffix_lengths[length_range]);
119
120        let data = self.data.as_ref();
121
122        for (prefix_length, suffix_length) in iter {
123            let prefix_length = *prefix_length as usize;
124            let suffix_length = *suffix_length as usize;
125
126            if self.data_offset + suffix_length > self.data.len() {
127                return Err(ParquetError::EOF("eof decoding byte array".into()));
128            }
129
130            self.last_value.truncate(prefix_length);
131            self.last_value
132                .extend_from_slice(&data[self.data_offset..self.data_offset + suffix_length]);
133            self.data_offset += suffix_length;
134        }
135        self.length_offset += to_skip;
136        Ok(to_skip)
137    }
138}