parquet/arrow/record_reader/
definition_levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_array::builder::BooleanBufferBuilder;
19use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
20use arrow_buffer::Buffer;
21use bytes::Bytes;
22
23use crate::arrow::buffer::bit_util::count_set_bits;
24use crate::basic::Encoding;
25use crate::column::reader::decoder::{
26    ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
27};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30
31enum BufferInner {
32    /// Compute levels and null mask
33    Full {
34        levels: Vec<i16>,
35        nulls: BooleanBufferBuilder,
36        max_level: i16,
37    },
38    /// Only compute null bitmask - requires max level to be 1
39    ///
40    /// This is an optimisation for the common case of a nullable scalar column, as decoding
41    /// the definition level data is only required when decoding nested structures
42    ///
43    Mask { nulls: BooleanBufferBuilder },
44}
45
46pub struct DefinitionLevelBuffer {
47    inner: BufferInner,
48
49    /// The length of this buffer
50    ///
51    /// Note: `buffer` and `builder` may contain more elements
52    len: usize,
53}
54
55impl DefinitionLevelBuffer {
56    pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
57        let inner = match null_mask_only {
58            true => {
59                assert_eq!(
60                    desc.max_def_level(),
61                    1,
62                    "max definition level must be 1 to only compute null bitmask"
63                );
64
65                assert_eq!(
66                    desc.max_rep_level(),
67                    0,
68                    "max repetition level must be 0 to only compute null bitmask"
69                );
70
71                BufferInner::Mask {
72                    nulls: BooleanBufferBuilder::new(0),
73                }
74            }
75            false => BufferInner::Full {
76                levels: Vec::new(),
77                nulls: BooleanBufferBuilder::new(0),
78                max_level: desc.max_def_level(),
79            },
80        };
81
82        Self { inner, len: 0 }
83    }
84
85    /// Returns the built level data
86    pub fn consume_levels(&mut self) -> Option<Vec<i16>> {
87        match &mut self.inner {
88            BufferInner::Full { levels, .. } => Some(std::mem::take(levels)),
89            BufferInner::Mask { .. } => None,
90        }
91    }
92
93    /// Returns the built null bitmask
94    pub fn consume_bitmask(&mut self) -> Buffer {
95        self.len = 0;
96        match &mut self.inner {
97            BufferInner::Full { nulls, .. } => nulls.finish().into_inner(),
98            BufferInner::Mask { nulls } => nulls.finish().into_inner(),
99        }
100    }
101
102    pub fn nulls(&self) -> &BooleanBufferBuilder {
103        match &self.inner {
104            BufferInner::Full { nulls, .. } => nulls,
105            BufferInner::Mask { nulls } => nulls,
106        }
107    }
108}
109
110enum MaybePacked {
111    Packed(PackedDecoder),
112    Fallback(DefinitionLevelDecoderImpl),
113}
114
115pub struct DefinitionLevelBufferDecoder {
116    max_level: i16,
117    decoder: MaybePacked,
118}
119
120impl DefinitionLevelBufferDecoder {
121    pub fn new(max_level: i16, packed: bool) -> Self {
122        let decoder = match packed {
123            true => MaybePacked::Packed(PackedDecoder::new()),
124            false => MaybePacked::Fallback(DefinitionLevelDecoderImpl::new(max_level)),
125        };
126
127        Self { max_level, decoder }
128    }
129}
130
131impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
132    type Buffer = DefinitionLevelBuffer;
133
134    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
135        match &mut self.decoder {
136            MaybePacked::Packed(d) => d.set_data(encoding, data),
137            MaybePacked::Fallback(d) => d.set_data(encoding, data),
138        }
139    }
140}
141
142impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
143    fn read_def_levels(
144        &mut self,
145        writer: &mut Self::Buffer,
146        num_levels: usize,
147    ) -> Result<(usize, usize)> {
148        match (&mut writer.inner, &mut self.decoder) {
149            (
150                BufferInner::Full {
151                    levels,
152                    nulls,
153                    max_level,
154                },
155                MaybePacked::Fallback(decoder),
156            ) => {
157                assert_eq!(self.max_level, *max_level);
158
159                let start = levels.len();
160                let (values_read, levels_read) = decoder.read_def_levels(levels, num_levels)?;
161
162                nulls.reserve(levels_read);
163                for i in &levels[start..] {
164                    nulls.append(i == max_level);
165                }
166
167                Ok((values_read, levels_read))
168            }
169            (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
170                assert_eq!(self.max_level, 1);
171
172                let start = nulls.len();
173                let levels_read = decoder.read(nulls, num_levels)?;
174
175                let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read);
176                Ok((values_read, levels_read))
177            }
178            _ => unreachable!("inconsistent null mask"),
179        }
180    }
181
182    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
183        match &mut self.decoder {
184            MaybePacked::Fallback(decoder) => decoder.skip_def_levels(num_levels),
185            MaybePacked::Packed(decoder) => decoder.skip(num_levels),
186        }
187    }
188}
189
190/// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit width of 1
191/// directly into a bitmask
192///
193/// This is significantly faster than decoding the data into `[i16]` and then computing
194/// a bitmask from this, as not only can it skip this buffer allocation and construction,
195/// but it can exploit properties of the encoded data to reduce work further
196///
197/// In particular:
198///
199/// * Packed runs are already bitmask encoded and can simply be appended
200/// * Runs of 1 or 0 bits can be efficiently appended with byte (or larger) operations
201///
202/// [RLE]: https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3
203/// [BIT_PACKED]: https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4
204struct PackedDecoder {
205    data: Bytes,
206    data_offset: usize,
207    rle_left: usize,
208    rle_value: bool,
209    packed_count: usize,
210    packed_offset: usize,
211}
212
213impl PackedDecoder {
214    fn next_rle_block(&mut self) -> Result<()> {
215        let indicator_value = self.decode_header()?;
216        if indicator_value & 1 == 1 {
217            let len = (indicator_value >> 1) as usize;
218            self.packed_count = len * 8;
219            self.packed_offset = 0;
220        } else {
221            self.rle_left = (indicator_value >> 1) as usize;
222            let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| {
223                ParquetError::EOF(
224                    "unexpected end of file whilst decoding definition levels rle value".into(),
225                )
226            })?;
227
228            self.data_offset += 1;
229            self.rle_value = byte != 0;
230        }
231        Ok(())
232    }
233
234    /// Decodes a VLQ encoded little endian integer and returns it
235    fn decode_header(&mut self) -> Result<i64> {
236        let mut offset = 0;
237        let mut v: i64 = 0;
238        while offset < 10 {
239            let byte = *self
240                .data
241                .as_ref()
242                .get(self.data_offset + offset)
243                .ok_or_else(|| {
244                    ParquetError::EOF(
245                        "unexpected end of file whilst decoding definition levels rle header"
246                            .into(),
247                    )
248                })?;
249
250            v |= ((byte & 0x7F) as i64) << (offset * 7);
251            offset += 1;
252            if byte & 0x80 == 0 {
253                self.data_offset += offset;
254                return Ok(v);
255            }
256        }
257        Err(general_err!("too many bytes for VLQ"))
258    }
259}
260
261impl PackedDecoder {
262    fn new() -> Self {
263        Self {
264            data: Bytes::from(vec![]),
265            data_offset: 0,
266            rle_left: 0,
267            rle_value: false,
268            packed_count: 0,
269            packed_offset: 0,
270        }
271    }
272
273    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
274        self.rle_left = 0;
275        self.rle_value = false;
276        self.packed_offset = 0;
277        self.packed_count = match encoding {
278            Encoding::RLE => 0,
279            #[allow(deprecated)]
280            Encoding::BIT_PACKED => data.len() * 8,
281            _ => unreachable!("invalid level encoding: {}", encoding),
282        };
283        self.data = data;
284        self.data_offset = 0;
285    }
286
287    fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<usize> {
288        let mut read = 0;
289        while read != len {
290            if self.rle_left != 0 {
291                let to_read = self.rle_left.min(len - read);
292                buffer.append_n(to_read, self.rle_value);
293                self.rle_left -= to_read;
294                read += to_read;
295            } else if self.packed_count != self.packed_offset {
296                let to_read = (self.packed_count - self.packed_offset).min(len - read);
297                let offset = self.data_offset * 8 + self.packed_offset;
298                buffer.append_packed_range(offset..offset + to_read, self.data.as_ref());
299                self.packed_offset += to_read;
300                read += to_read;
301
302                if self.packed_offset == self.packed_count {
303                    self.data_offset += self.packed_count / 8;
304                }
305            } else if self.data_offset == self.data.len() {
306                break;
307            } else {
308                self.next_rle_block()?
309            }
310        }
311        Ok(read)
312    }
313
314    /// Skips `level_num` definition levels
315    ///
316    /// Returns the number of values skipped and the number of levels skipped
317    fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
318        let mut skipped_value = 0;
319        let mut skipped_level = 0;
320        while skipped_level != level_num {
321            if self.rle_left != 0 {
322                let to_skip = self.rle_left.min(level_num - skipped_level);
323                self.rle_left -= to_skip;
324                skipped_level += to_skip;
325                if self.rle_value {
326                    skipped_value += to_skip;
327                }
328            } else if self.packed_count != self.packed_offset {
329                let to_skip =
330                    (self.packed_count - self.packed_offset).min(level_num - skipped_level);
331                let offset = self.data_offset * 8 + self.packed_offset;
332                let bit_chunk = UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
333                skipped_value += bit_chunk.count_ones();
334                self.packed_offset += to_skip;
335                skipped_level += to_skip;
336                if self.packed_offset == self.packed_count {
337                    self.data_offset += self.packed_count / 8;
338                }
339            } else if self.data_offset == self.data.len() {
340                break;
341            } else {
342                self.next_rle_block()?
343            }
344        }
345        Ok((skipped_value, skipped_level))
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352
353    use crate::encodings::rle::RleEncoder;
354    use rand::{thread_rng, Rng};
355
356    #[test]
357    fn test_packed_decoder() {
358        let mut rng = thread_rng();
359        let len: usize = rng.gen_range(512..1024);
360
361        let mut expected = BooleanBufferBuilder::new(len);
362        let mut encoder = RleEncoder::new(1, 1024);
363        for _ in 0..len {
364            let bool = rng.gen_bool(0.8);
365            encoder.put(bool as u64);
366            expected.append(bool);
367        }
368        assert_eq!(expected.len(), len);
369
370        let encoded = encoder.consume();
371        let mut decoder = PackedDecoder::new();
372        decoder.set_data(Encoding::RLE, encoded.into());
373
374        // Decode data in random length intervals
375        let mut decoded = BooleanBufferBuilder::new(len);
376        loop {
377            let remaining = len - decoded.len();
378            if remaining == 0 {
379                break;
380            }
381
382            let to_read = rng.gen_range(1..=remaining);
383            decoder.read(&mut decoded, to_read).unwrap();
384        }
385
386        assert_eq!(decoded.len(), len);
387        assert_eq!(decoded.as_slice(), expected.as_slice());
388    }
389
390    #[test]
391    fn test_packed_decoder_skip() {
392        let mut rng = thread_rng();
393        let len: usize = rng.gen_range(512..1024);
394
395        let mut expected = BooleanBufferBuilder::new(len);
396        let mut encoder = RleEncoder::new(1, 1024);
397
398        let mut total_value = 0;
399        for _ in 0..len {
400            let bool = rng.gen_bool(0.8);
401            encoder.put(bool as u64);
402            expected.append(bool);
403            if bool {
404                total_value += 1;
405            }
406        }
407        assert_eq!(expected.len(), len);
408
409        let encoded = encoder.consume();
410        let mut decoder = PackedDecoder::new();
411        decoder.set_data(Encoding::RLE, encoded.into());
412
413        let mut skip_value = 0;
414        let mut read_value = 0;
415        let mut skip_level = 0;
416        let mut read_level = 0;
417
418        loop {
419            let offset = skip_level + read_level;
420            let remaining_levels = len - offset;
421            if remaining_levels == 0 {
422                break;
423            }
424            let to_read_or_skip_level = rng.gen_range(1..=remaining_levels);
425            if rng.gen_bool(0.5) {
426                let (skip_val_num, skip_level_num) = decoder.skip(to_read_or_skip_level).unwrap();
427                skip_value += skip_val_num;
428                skip_level += skip_level_num
429            } else {
430                let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
431                let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
432                read_level += read_level_num;
433                for i in 0..read_level_num {
434                    assert!(!decoded.is_empty());
435                    //check each read bit
436                    let read_bit = decoded.get_bit(i);
437                    if read_bit {
438                        read_value += 1;
439                    }
440                    let expect_bit = expected.get_bit(i + offset);
441                    assert_eq!(read_bit, expect_bit);
442                }
443            }
444        }
445        assert_eq!(read_level + skip_level, len);
446        assert_eq!(read_value + skip_value, total_value);
447    }
448}