parquet/arrow/decoder/
dictionary_index.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19
20use crate::encodings::rle::RleDecoder;
21use crate::errors::Result;
22
23/// Decoder for `Encoding::RLE_DICTIONARY` indices
24pub struct DictIndexDecoder {
25    /// Decoder for the dictionary offsets array
26    decoder: RleDecoder,
27
28    /// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
29    /// offsets
30    index_buf: Box<[i32; 1024]>,
31    /// Current length of `index_buf`
32    index_buf_len: usize,
33    /// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
34    /// the entire buffer and need to decode another chunk of offsets.
35    index_offset: usize,
36
37    /// This is a maximum as the null count is not always known, e.g. value data from
38    /// a v1 data page
39    max_remaining_values: usize,
40}
41
42impl DictIndexDecoder {
43    /// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels
44    /// associated with this data page, and the number of non-null values (if known)
45    pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
46        let bit_width = data[0];
47        let mut decoder = RleDecoder::new(bit_width);
48        decoder.set_data(data.slice(1..));
49
50        Self {
51            decoder,
52            index_buf: Box::new([0; 1024]),
53            index_buf_len: 0,
54            index_offset: 0,
55            max_remaining_values: num_values.unwrap_or(num_levels),
56        }
57    }
58
59    /// Read up to `len` values, returning the number of values read
60    /// and calling `f` with each decoded dictionary index
61    ///
62    /// Will short-circuit and return on error
63    pub fn read<F: FnMut(&[i32]) -> Result<()>>(&mut self, len: usize, mut f: F) -> Result<usize> {
64        let mut values_read = 0;
65
66        while values_read != len && self.max_remaining_values != 0 {
67            if self.index_offset == self.index_buf_len {
68                // We've consumed the entire index buffer so we need to reload it before proceeding
69                let read = self.decoder.get_batch(self.index_buf.as_mut())?;
70                if read == 0 {
71                    break;
72                }
73                self.index_buf_len = read;
74                self.index_offset = 0;
75            }
76
77            let to_read = (len - values_read)
78                .min(self.index_buf_len - self.index_offset)
79                .min(self.max_remaining_values);
80
81            f(&self.index_buf[self.index_offset..self.index_offset + to_read])?;
82
83            self.index_offset += to_read;
84            values_read += to_read;
85            self.max_remaining_values -= to_read;
86        }
87        Ok(values_read)
88    }
89
90    /// Skip up to `to_skip` values, returning the number of values skipped
91    pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
92        let to_skip = to_skip.min(self.max_remaining_values);
93
94        let mut values_skip = 0;
95        while values_skip < to_skip {
96            if self.index_offset == self.index_buf_len {
97                // Instead of reloading the buffer, just skip in the decoder
98                let skip = self.decoder.skip(to_skip - values_skip)?;
99
100                if skip == 0 {
101                    break;
102                }
103
104                self.max_remaining_values -= skip;
105                values_skip += skip;
106            } else {
107                // We still have indices buffered, so skip within the buffer
108                let skip = (to_skip - values_skip).min(self.index_buf_len - self.index_offset);
109
110                self.index_offset += skip;
111                self.max_remaining_values -= skip;
112                values_skip += skip;
113            }
114        }
115        Ok(values_skip)
116    }
117}