parquet/arrow/decoder/dictionary_index.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19
20use crate::encodings::rle::RleDecoder;
21use crate::errors::Result;
22
23/// Decoder for `Encoding::RLE_DICTIONARY` indices
24pub struct DictIndexDecoder {
25 /// Decoder for the dictionary offsets array
26 decoder: RleDecoder,
27
28 /// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
29 /// offsets
30 index_buf: Box<[i32; 1024]>,
31 /// Current length of `index_buf`
32 index_buf_len: usize,
33 /// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
34 /// the entire buffer and need to decode another chunk of offsets.
35 index_offset: usize,
36
37 /// This is a maximum as the null count is not always known, e.g. value data from
38 /// a v1 data page
39 max_remaining_values: usize,
40}
41
42impl DictIndexDecoder {
43 /// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels
44 /// associated with this data page, and the number of non-null values (if known)
45 pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
46 let bit_width = data[0];
47 let mut decoder = RleDecoder::new(bit_width);
48 decoder.set_data(data.slice(1..))?;
49
50 Ok(Self {
51 decoder,
52 index_buf: Box::new([0; 1024]),
53 index_buf_len: 0,
54 index_offset: 0,
55 max_remaining_values: num_values.unwrap_or(num_levels),
56 })
57 }
58
59 /// Read up to `len` values, returning the number of values read
60 /// and calling `f` with each decoded dictionary index
61 ///
62 /// Will short-circuit and return on error
63 #[inline(always)]
64 pub fn read<F: FnMut(&[i32]) -> Result<()>>(&mut self, len: usize, mut f: F) -> Result<usize> {
65 let total_to_read = len.min(self.max_remaining_values);
66
67 let mut values_read = 0;
68
69 let index_buf = self.index_buf.as_mut();
70 while values_read < total_to_read {
71 if self.index_offset == self.index_buf_len {
72 // We've consumed the entire index buffer so we need to reload it before proceeding
73 let read = self.decoder.get_batch(index_buf)?;
74 if read == 0 {
75 break;
76 }
77 self.index_buf_len = read;
78 self.index_offset = 0;
79 }
80
81 let available = self.index_buf_len - self.index_offset;
82 let n = available.min(total_to_read - values_read);
83
84 f(&index_buf[self.index_offset..self.index_offset + n])?;
85
86 self.index_offset += n;
87 values_read += n;
88 }
89 self.max_remaining_values -= values_read;
90
91 Ok(values_read)
92 }
93
94 /// Skip up to `to_skip` values, returning the number of values skipped
95 pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
96 let to_skip = to_skip.min(self.max_remaining_values);
97
98 let mut values_skip = 0;
99 while values_skip < to_skip {
100 if self.index_offset == self.index_buf_len {
101 // Instead of reloading the buffer, just skip in the decoder
102 let skip = self.decoder.skip(to_skip - values_skip)?;
103
104 if skip == 0 {
105 break;
106 }
107
108 self.max_remaining_values -= skip;
109 values_skip += skip;
110 } else {
111 // We still have indices buffered, so skip within the buffer
112 let skip = (to_skip - values_skip).min(self.index_buf_len - self.index_offset);
113
114 self.index_offset += skip;
115 self.max_remaining_values -= skip;
116 values_skip += skip;
117 }
118 }
119 Ok(values_skip)
120 }
121}