parquet/arrow/record_reader/
definition_levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_array::builder::BooleanBufferBuilder;
19use arrow_buffer::Buffer;
20use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
21use bytes::Bytes;
22
23use crate::arrow::buffer::bit_util::count_set_bits;
24use crate::basic::Encoding;
25use crate::column::reader::decoder::{
26    ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
27};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30
31enum BufferInner {
32    /// Compute levels and null mask
33    Full {
34        levels: Vec<i16>,
35        nulls: BooleanBufferBuilder,
36        max_level: i16,
37    },
38    /// Only compute null bitmask - requires max level to be 1
39    ///
40    /// This is an optimisation for the common case of a nullable scalar column, as decoding
41    /// the definition level data is only required when decoding nested structures
42    ///
43    Mask { nulls: BooleanBufferBuilder },
44}
45
46pub struct DefinitionLevelBuffer {
47    inner: BufferInner,
48
49    /// The length of this buffer
50    ///
51    /// Note: `buffer` and `builder` may contain more elements
52    len: usize,
53}
54
55impl DefinitionLevelBuffer {
56    pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
57        let inner = match null_mask_only {
58            true => {
59                assert_eq!(
60                    desc.max_def_level(),
61                    1,
62                    "max definition level must be 1 to only compute null bitmask"
63                );
64
65                assert_eq!(
66                    desc.max_rep_level(),
67                    0,
68                    "max repetition level must be 0 to only compute null bitmask"
69                );
70
71                BufferInner::Mask {
72                    nulls: BooleanBufferBuilder::new(0),
73                }
74            }
75            false => BufferInner::Full {
76                levels: Vec::new(),
77                nulls: BooleanBufferBuilder::new(0),
78                max_level: desc.max_def_level(),
79            },
80        };
81
82        Self { inner, len: 0 }
83    }
84
85    /// Returns the built level data
86    pub fn consume_levels(&mut self) -> Option<Vec<i16>> {
87        match &mut self.inner {
88            BufferInner::Full { levels, .. } => Some(std::mem::take(levels)),
89            BufferInner::Mask { .. } => None,
90        }
91    }
92
93    /// Returns the built null bitmask
94    pub fn consume_bitmask(&mut self) -> Buffer {
95        self.len = 0;
96        match &mut self.inner {
97            BufferInner::Full { nulls, .. } => nulls.finish().into_inner(),
98            BufferInner::Mask { nulls } => nulls.finish().into_inner(),
99        }
100    }
101
102    pub fn nulls(&self) -> &BooleanBufferBuilder {
103        match &self.inner {
104            BufferInner::Full { nulls, .. } => nulls,
105            BufferInner::Mask { nulls } => nulls,
106        }
107    }
108}
109
110enum MaybePacked {
111    Packed(PackedDecoder),
112    Fallback(DefinitionLevelDecoderImpl),
113}
114
115pub struct DefinitionLevelBufferDecoder {
116    max_level: i16,
117    decoder: MaybePacked,
118}
119
120impl DefinitionLevelBufferDecoder {
121    pub fn new(max_level: i16, packed: bool) -> Self {
122        let decoder = match packed {
123            true => MaybePacked::Packed(PackedDecoder::new()),
124            false => MaybePacked::Fallback(DefinitionLevelDecoderImpl::new(max_level)),
125        };
126
127        Self { max_level, decoder }
128    }
129}
130
131impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
132    type Buffer = DefinitionLevelBuffer;
133
134    fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
135        match &mut self.decoder {
136            MaybePacked::Packed(d) => d.set_data(encoding, data),
137            MaybePacked::Fallback(d) => d.set_data(encoding, data)?,
138        };
139        Ok(())
140    }
141}
142
143impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
144    fn read_def_levels(
145        &mut self,
146        writer: &mut Self::Buffer,
147        num_levels: usize,
148    ) -> Result<(usize, usize)> {
149        match (&mut writer.inner, &mut self.decoder) {
150            (
151                BufferInner::Full {
152                    levels,
153                    nulls,
154                    max_level,
155                },
156                MaybePacked::Fallback(decoder),
157            ) => {
158                assert_eq!(self.max_level, *max_level);
159
160                let start = levels.len();
161                let (values_read, levels_read) = decoder.read_def_levels(levels, num_levels)?;
162
163                nulls.reserve(levels_read);
164                for i in &levels[start..] {
165                    nulls.append(i == max_level);
166                }
167
168                Ok((values_read, levels_read))
169            }
170            (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
171                assert_eq!(self.max_level, 1);
172
173                let start = nulls.len();
174                let levels_read = decoder.read(nulls, num_levels)?;
175
176                let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read);
177                Ok((values_read, levels_read))
178            }
179            _ => unreachable!("inconsistent null mask"),
180        }
181    }
182
183    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
184        match &mut self.decoder {
185            MaybePacked::Fallback(decoder) => decoder.skip_def_levels(num_levels),
186            MaybePacked::Packed(decoder) => decoder.skip(num_levels),
187        }
188    }
189}
190
191/// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit width of 1
192/// directly into a bitmask
193///
194/// This is significantly faster than decoding the data into `[i16]` and then computing
195/// a bitmask from this, as not only can it skip this buffer allocation and construction,
196/// but it can exploit properties of the encoded data to reduce work further
197///
198/// In particular:
199///
200/// * Packed runs are already bitmask encoded and can simply be appended
201/// * Runs of 1 or 0 bits can be efficiently appended with byte (or larger) operations
202///
203/// [RLE]: https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3
204/// [BIT_PACKED]: https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4
205struct PackedDecoder {
206    data: Bytes,
207    data_offset: usize,
208    rle_left: usize,
209    rle_value: bool,
210    packed_count: usize,
211    packed_offset: usize,
212}
213
214impl PackedDecoder {
215    fn next_rle_block(&mut self) -> Result<()> {
216        let indicator_value = self.decode_header()?;
217        if indicator_value & 1 == 1 {
218            let len = (indicator_value >> 1) as usize;
219            self.packed_count = len * 8;
220            self.packed_offset = 0;
221        } else {
222            self.rle_left = (indicator_value >> 1) as usize;
223            let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| {
224                ParquetError::EOF(
225                    "unexpected end of file whilst decoding definition levels rle value".into(),
226                )
227            })?;
228
229            self.data_offset += 1;
230            self.rle_value = byte != 0;
231        }
232        Ok(())
233    }
234
235    /// Decodes a VLQ encoded little endian integer and returns it
236    fn decode_header(&mut self) -> Result<i64> {
237        let mut offset = 0;
238        let mut v: i64 = 0;
239        while offset < 10 {
240            let byte = *self
241                .data
242                .as_ref()
243                .get(self.data_offset + offset)
244                .ok_or_else(|| {
245                    ParquetError::EOF(
246                        "unexpected end of file whilst decoding definition levels rle header"
247                            .into(),
248                    )
249                })?;
250
251            v |= ((byte & 0x7F) as i64) << (offset * 7);
252            offset += 1;
253            if byte & 0x80 == 0 {
254                self.data_offset += offset;
255                return Ok(v);
256            }
257        }
258        Err(general_err!("too many bytes for VLQ"))
259    }
260}
261
262impl PackedDecoder {
263    fn new() -> Self {
264        Self {
265            data: Bytes::from(vec![]),
266            data_offset: 0,
267            rle_left: 0,
268            rle_value: false,
269            packed_count: 0,
270            packed_offset: 0,
271        }
272    }
273
274    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
275        self.rle_left = 0;
276        self.rle_value = false;
277        self.packed_offset = 0;
278        self.packed_count = match encoding {
279            Encoding::RLE => 0,
280            #[allow(deprecated)]
281            Encoding::BIT_PACKED => data.len() * 8,
282            _ => unreachable!("invalid level encoding: {}", encoding),
283        };
284        self.data = data;
285        self.data_offset = 0;
286    }
287
288    fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<usize> {
289        let mut read = 0;
290        while read != len {
291            if self.rle_left != 0 {
292                let to_read = self.rle_left.min(len - read);
293                buffer.append_n(to_read, self.rle_value);
294                self.rle_left -= to_read;
295                read += to_read;
296            } else if self.packed_count != self.packed_offset {
297                let to_read = (self.packed_count - self.packed_offset).min(len - read);
298                let offset = self.data_offset * 8 + self.packed_offset;
299                buffer.append_packed_range(offset..offset + to_read, self.data.as_ref());
300                self.packed_offset += to_read;
301                read += to_read;
302
303                if self.packed_offset == self.packed_count {
304                    self.data_offset += self.packed_count / 8;
305                }
306            } else if self.data_offset == self.data.len() {
307                break;
308            } else {
309                self.next_rle_block()?
310            }
311        }
312        Ok(read)
313    }
314
315    /// Skips `level_num` definition levels
316    ///
317    /// Returns the number of values skipped and the number of levels skipped
318    fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
319        let mut skipped_value = 0;
320        let mut skipped_level = 0;
321        while skipped_level != level_num {
322            if self.rle_left != 0 {
323                let to_skip = self.rle_left.min(level_num - skipped_level);
324                self.rle_left -= to_skip;
325                skipped_level += to_skip;
326                if self.rle_value {
327                    skipped_value += to_skip;
328                }
329            } else if self.packed_count != self.packed_offset {
330                let to_skip =
331                    (self.packed_count - self.packed_offset).min(level_num - skipped_level);
332                let offset = self.data_offset * 8 + self.packed_offset;
333                let bit_chunk = UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
334                skipped_value += bit_chunk.count_ones();
335                self.packed_offset += to_skip;
336                skipped_level += to_skip;
337                if self.packed_offset == self.packed_count {
338                    self.data_offset += self.packed_count / 8;
339                }
340            } else if self.data_offset == self.data.len() {
341                break;
342            } else {
343                self.next_rle_block()?
344            }
345        }
346        Ok((skipped_value, skipped_level))
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    use crate::encodings::rle::RleEncoder;
355    use rand::{Rng, rng};
356
357    #[test]
358    fn test_packed_decoder() {
359        let mut rng = rng();
360        let len: usize = rng.random_range(512..1024);
361
362        let mut expected = BooleanBufferBuilder::new(len);
363        let mut encoder = RleEncoder::new(1, 1024);
364        for _ in 0..len {
365            let bool = rng.random_bool(0.8);
366            encoder.put(bool as u64);
367            expected.append(bool);
368        }
369        assert_eq!(expected.len(), len);
370
371        let encoded = encoder.consume();
372        let mut decoder = PackedDecoder::new();
373        decoder.set_data(Encoding::RLE, encoded.into());
374
375        // Decode data in random length intervals
376        let mut decoded = BooleanBufferBuilder::new(len);
377        loop {
378            let remaining = len - decoded.len();
379            if remaining == 0 {
380                break;
381            }
382
383            let to_read = rng.random_range(1..=remaining);
384            decoder.read(&mut decoded, to_read).unwrap();
385        }
386
387        assert_eq!(decoded.len(), len);
388        assert_eq!(decoded.as_slice(), expected.as_slice());
389    }
390
391    #[test]
392    fn test_packed_decoder_skip() {
393        let mut rng = rng();
394        let len: usize = rng.random_range(512..1024);
395
396        let mut expected = BooleanBufferBuilder::new(len);
397        let mut encoder = RleEncoder::new(1, 1024);
398
399        let mut total_value = 0;
400        for _ in 0..len {
401            let bool = rng.random_bool(0.8);
402            encoder.put(bool as u64);
403            expected.append(bool);
404            if bool {
405                total_value += 1;
406            }
407        }
408        assert_eq!(expected.len(), len);
409
410        let encoded = encoder.consume();
411        let mut decoder = PackedDecoder::new();
412        decoder.set_data(Encoding::RLE, encoded.into());
413
414        let mut skip_value = 0;
415        let mut read_value = 0;
416        let mut skip_level = 0;
417        let mut read_level = 0;
418
419        loop {
420            let offset = skip_level + read_level;
421            let remaining_levels = len - offset;
422            if remaining_levels == 0 {
423                break;
424            }
425            let to_read_or_skip_level = rng.random_range(1..=remaining_levels);
426            if rng.random_bool(0.5) {
427                let (skip_val_num, skip_level_num) = decoder.skip(to_read_or_skip_level).unwrap();
428                skip_value += skip_val_num;
429                skip_level += skip_level_num
430            } else {
431                let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
432                let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
433                read_level += read_level_num;
434                for i in 0..read_level_num {
435                    assert!(!decoded.is_empty());
436                    //check each read bit
437                    let read_bit = decoded.get_bit(i);
438                    if read_bit {
439                        read_value += 1;
440                    }
441                    let expect_bit = expected.get_bit(i + offset);
442                    assert_eq!(read_bit, expect_bit);
443                }
444            }
445        }
446        assert_eq!(read_level + skip_level, len);
447        assert_eq!(read_value + skip_value, total_value);
448    }
449}