Skip to main content

parquet/arrow/record_reader/
definition_levels.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_array::builder::BooleanBufferBuilder;
19use arrow_buffer::Buffer;
20use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
21use bytes::Bytes;
22
23use crate::arrow::buffer::bit_util::count_set_bits;
24use crate::basic::Encoding;
25use crate::column::reader::decoder::{
26    ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
27};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30
31enum BufferInner {
32    /// Compute levels and null mask
33    Full {
34        levels: Vec<i16>,
35        nulls: BooleanBufferBuilder,
36        max_level: i16,
37    },
38    /// Only compute null bitmask - requires max level to be 1
39    ///
40    /// This is an optimisation for the common case of a nullable scalar column, as decoding
41    /// the definition level data is only required when decoding nested structures
42    ///
43    Mask { nulls: BooleanBufferBuilder },
44}
45
46pub struct DefinitionLevelBuffer {
47    inner: BufferInner,
48
49    /// The length of this buffer
50    ///
51    /// Note: `buffer` and `builder` may contain more elements
52    len: usize,
53}
54
55impl DefinitionLevelBuffer {
56    pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
57        let inner = match null_mask_only {
58            true => {
59                assert_eq!(
60                    desc.max_def_level(),
61                    1,
62                    "max definition level must be 1 to only compute null bitmask"
63                );
64
65                assert_eq!(
66                    desc.max_rep_level(),
67                    0,
68                    "max repetition level must be 0 to only compute null bitmask"
69                );
70
71                BufferInner::Mask {
72                    nulls: BooleanBufferBuilder::new(0),
73                }
74            }
75            false => BufferInner::Full {
76                levels: Vec::new(),
77                nulls: BooleanBufferBuilder::new(0),
78                max_level: desc.max_def_level(),
79            },
80        };
81
82        Self { inner, len: 0 }
83    }
84
85    /// Returns the built level data
86    pub fn consume_levels(&mut self) -> Option<Vec<i16>> {
87        match &mut self.inner {
88            BufferInner::Full { levels, .. } => Some(std::mem::take(levels)),
89            BufferInner::Mask { .. } => None,
90        }
91    }
92
93    /// Returns the built null bitmask, or None if all values are valid
94    pub fn consume_bitmask(&mut self) -> Option<Buffer> {
95        self.len = 0;
96        let nulls = match &mut self.inner {
97            BufferInner::Full { nulls, .. } => nulls,
98            BufferInner::Mask { nulls } => nulls,
99        };
100
101        // Always call finish() to reset the builder state for the next batch.
102        let buffer = nulls.finish().into_inner();
103
104        // If no bitmap was constructed, return None
105        if buffer.is_empty() {
106            return None;
107        }
108
109        Some(buffer)
110    }
111
112    pub fn nulls(&self) -> &BooleanBufferBuilder {
113        match &self.inner {
114            BufferInner::Full { nulls, .. } => nulls,
115            BufferInner::Mask { nulls } => nulls,
116        }
117    }
118}
119
120enum MaybePacked {
121    Packed(PackedDecoder),
122    Fallback(DefinitionLevelDecoderImpl),
123}
124
125pub struct DefinitionLevelBufferDecoder {
126    max_level: i16,
127    decoder: MaybePacked,
128}
129
130impl DefinitionLevelBufferDecoder {
131    pub fn new(max_level: i16, packed: bool) -> Self {
132        let decoder = match packed {
133            true => MaybePacked::Packed(PackedDecoder::new()),
134            false => MaybePacked::Fallback(DefinitionLevelDecoderImpl::new(max_level)),
135        };
136
137        Self { max_level, decoder }
138    }
139}
140
141impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
142    type Buffer = DefinitionLevelBuffer;
143
144    fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
145        match &mut self.decoder {
146            MaybePacked::Packed(d) => d.set_data(encoding, data),
147            MaybePacked::Fallback(d) => d.set_data(encoding, data)?,
148        };
149        Ok(())
150    }
151}
152
153impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
154    fn read_def_levels(
155        &mut self,
156        writer: &mut Self::Buffer,
157        num_levels: usize,
158    ) -> Result<(usize, usize)> {
159        match (&mut writer.inner, &mut self.decoder) {
160            (
161                BufferInner::Full {
162                    levels,
163                    nulls,
164                    max_level,
165                },
166                MaybePacked::Fallback(decoder),
167            ) => {
168                assert_eq!(self.max_level, *max_level);
169
170                let start = levels.len();
171                let (values_read, levels_read) = decoder.read_def_levels(levels, num_levels)?;
172
173                // Safety: slice iterator has a trusted length
174                unsafe {
175                    nulls
176                        .extend_trusted_len(levels[start..].iter().map(|level| level == max_level));
177                }
178
179                Ok((values_read, levels_read))
180            }
181            (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
182                assert_eq!(self.max_level, 1);
183
184                // Fast path: if all requested levels are valid (max definition level),
185                // we can skip RLE decoding and just append all-ones to the bitmap.
186                // This is faster than decoding RLE data.
187                if let Some(count) = decoder.try_consume_all_valid(num_levels)? {
188                    nulls.append_n(count, true);
189                    return Ok((count, count)); // values_read == levels_read when all valid
190                }
191
192                // Normal path: decode RLE data into the bitmap
193                let start = nulls.len();
194                let levels_read = decoder.read(nulls, num_levels)?;
195
196                let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read);
197                Ok((values_read, levels_read))
198            }
199            _ => unreachable!("inconsistent null mask"),
200        }
201    }
202
203    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
204        match &mut self.decoder {
205            MaybePacked::Fallback(decoder) => decoder.skip_def_levels(num_levels),
206            MaybePacked::Packed(decoder) => decoder.skip(num_levels),
207        }
208    }
209}
210
211/// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit width of 1
212/// directly into a bitmask
213///
214/// This is significantly faster than decoding the data into `[i16]` and then computing
215/// a bitmask from this, as not only can it skip this buffer allocation and construction,
216/// but it can exploit properties of the encoded data to reduce work further
217///
218/// In particular:
219///
220/// * Packed runs are already bitmask encoded and can simply be appended
221/// * Runs of 1 or 0 bits can be efficiently appended with byte (or larger) operations
222///
223/// [RLE]: https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3
224/// [BIT_PACKED]: https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4
225struct PackedDecoder {
226    data: Bytes,
227    data_offset: usize,
228    rle_left: usize,
229    rle_value: bool,
230    packed_count: usize,
231    packed_offset: usize,
232}
233
234impl PackedDecoder {
235    fn next_rle_block(&mut self) -> Result<()> {
236        let indicator_value = self.decode_header()?;
237        if indicator_value & 1 == 1 {
238            let len = (indicator_value >> 1) as usize;
239            self.packed_count = len * 8;
240            self.packed_offset = 0;
241        } else {
242            self.rle_left = (indicator_value >> 1) as usize;
243            let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| {
244                ParquetError::EOF(
245                    "unexpected end of file whilst decoding definition levels rle value".into(),
246                )
247            })?;
248
249            self.data_offset += 1;
250            self.rle_value = byte != 0;
251        }
252        Ok(())
253    }
254
255    /// Decodes a VLQ encoded little endian integer and returns it
256    fn decode_header(&mut self) -> Result<i64> {
257        let mut offset = 0;
258        let mut v: i64 = 0;
259        while offset < 10 {
260            let byte = *self
261                .data
262                .as_ref()
263                .get(self.data_offset + offset)
264                .ok_or_else(|| {
265                    ParquetError::EOF(
266                        "unexpected end of file whilst decoding definition levels rle header"
267                            .into(),
268                    )
269                })?;
270
271            v |= ((byte & 0x7F) as i64) << (offset * 7);
272            offset += 1;
273            if byte & 0x80 == 0 {
274                self.data_offset += offset;
275                return Ok(v);
276            }
277        }
278        Err(general_err!("too many bytes for VLQ"))
279    }
280}
281
282impl PackedDecoder {
283    fn new() -> Self {
284        Self {
285            data: Bytes::from(vec![]),
286            data_offset: 0,
287            rle_left: 0,
288            rle_value: false,
289            packed_count: 0,
290            packed_offset: 0,
291        }
292    }
293
294    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
295        self.rle_left = 0;
296        self.rle_value = false;
297        self.packed_offset = 0;
298        self.packed_count = match encoding {
299            Encoding::RLE => 0,
300            #[allow(deprecated)]
301            Encoding::BIT_PACKED => data.len() * 8,
302            _ => unreachable!("invalid level encoding: {}", encoding),
303        };
304        self.data = data;
305        self.data_offset = 0;
306    }
307
308    /// Try to consume `len` levels if all are valid (max definition level).
309    ///
310    /// Returns `Ok(Some(count))` if successfully consumed `count` all-valid levels.
311    /// Returns `Ok(None)` if there are any nulls or packed data that prevents fast path.
312    ///
313    /// Note: On `None`, the decoder state may have advanced to the next RLE block,
314    /// but only if `rle_left` was zero (i.e., the block would have been loaded
315    /// on the next read anyway).
316    fn try_consume_all_valid(&mut self, len: usize) -> Result<Option<usize>> {
317        // If no active run and no packed data pending, try to parse the next RLE block
318        if self.rle_left == 0 && self.packed_count == self.packed_offset {
319            if self.data_offset < self.data.len() {
320                self.next_rle_block()?;
321            } else {
322                // No more data available
323                return Ok(None);
324            }
325        }
326
327        // Fast path only works when we have an active RLE run of true values
328        // that covers the entire requested length.
329        if self.rle_left >= len && self.rle_value {
330            self.rle_left -= len;
331            return Ok(Some(len));
332        }
333
334        // Any other case (null run, packed data, or insufficient length)
335        // falls back to normal path
336        Ok(None)
337    }
338
339    fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<usize> {
340        let mut read = 0;
341        while read != len {
342            if self.rle_left != 0 {
343                let to_read = self.rle_left.min(len - read);
344                buffer.append_n(to_read, self.rle_value);
345                self.rle_left -= to_read;
346                read += to_read;
347            } else if self.packed_count != self.packed_offset {
348                let to_read = (self.packed_count - self.packed_offset).min(len - read);
349                let offset = self.data_offset * 8 + self.packed_offset;
350                buffer.append_packed_range(offset..offset + to_read, self.data.as_ref());
351                self.packed_offset += to_read;
352                read += to_read;
353
354                if self.packed_offset == self.packed_count {
355                    self.data_offset += self.packed_count / 8;
356                }
357            } else if self.data_offset == self.data.len() {
358                break;
359            } else {
360                self.next_rle_block()?
361            }
362        }
363        Ok(read)
364    }
365
366    /// Skips `level_num` definition levels
367    ///
368    /// Returns the number of values skipped and the number of levels skipped
369    fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
370        let mut skipped_value = 0;
371        let mut skipped_level = 0;
372        while skipped_level != level_num {
373            if self.rle_left != 0 {
374                let to_skip = self.rle_left.min(level_num - skipped_level);
375                self.rle_left -= to_skip;
376                skipped_level += to_skip;
377                if self.rle_value {
378                    skipped_value += to_skip;
379                }
380            } else if self.packed_count != self.packed_offset {
381                let to_skip =
382                    (self.packed_count - self.packed_offset).min(level_num - skipped_level);
383                let offset = self.data_offset * 8 + self.packed_offset;
384                let bit_chunk = UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
385                skipped_value += bit_chunk.count_ones();
386                self.packed_offset += to_skip;
387                skipped_level += to_skip;
388                if self.packed_offset == self.packed_count {
389                    self.data_offset += self.packed_count / 8;
390                }
391            } else if self.data_offset == self.data.len() {
392                break;
393            } else {
394                self.next_rle_block()?
395            }
396        }
397        Ok((skipped_value, skipped_level))
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    use crate::encodings::rle::RleEncoder;
406    use rand::{Rng, rng};
407
408    #[test]
409    fn test_packed_decoder() {
410        let mut rng = rng();
411        let len: usize = rng.random_range(512..1024);
412
413        let mut expected = BooleanBufferBuilder::new(len);
414        let mut encoder = RleEncoder::new(1, 1024);
415        for _ in 0..len {
416            let bool = rng.random_bool(0.8);
417            encoder.put(bool as u64);
418            expected.append(bool);
419        }
420        assert_eq!(expected.len(), len);
421
422        let encoded = encoder.consume();
423        let mut decoder = PackedDecoder::new();
424        decoder.set_data(Encoding::RLE, encoded.into());
425
426        // Decode data in random length intervals
427        let mut decoded = BooleanBufferBuilder::new(len);
428        loop {
429            let remaining = len - decoded.len();
430            if remaining == 0 {
431                break;
432            }
433
434            let to_read = rng.random_range(1..=remaining);
435            decoder.read(&mut decoded, to_read).unwrap();
436        }
437
438        assert_eq!(decoded.len(), len);
439        assert_eq!(decoded.as_slice(), expected.as_slice());
440    }
441
442    #[test]
443    fn test_packed_decoder_skip() {
444        let mut rng = rng();
445        let len: usize = rng.random_range(512..1024);
446
447        let mut expected = BooleanBufferBuilder::new(len);
448        let mut encoder = RleEncoder::new(1, 1024);
449
450        let mut total_value = 0;
451        for _ in 0..len {
452            let bool = rng.random_bool(0.8);
453            encoder.put(bool as u64);
454            expected.append(bool);
455            if bool {
456                total_value += 1;
457            }
458        }
459        assert_eq!(expected.len(), len);
460
461        let encoded = encoder.consume();
462        let mut decoder = PackedDecoder::new();
463        decoder.set_data(Encoding::RLE, encoded.into());
464
465        let mut skip_value = 0;
466        let mut read_value = 0;
467        let mut skip_level = 0;
468        let mut read_level = 0;
469
470        loop {
471            let offset = skip_level + read_level;
472            let remaining_levels = len - offset;
473            if remaining_levels == 0 {
474                break;
475            }
476            let to_read_or_skip_level = rng.random_range(1..=remaining_levels);
477            if rng.random_bool(0.5) {
478                let (skip_val_num, skip_level_num) = decoder.skip(to_read_or_skip_level).unwrap();
479                skip_value += skip_val_num;
480                skip_level += skip_level_num
481            } else {
482                let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
483                let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
484                read_level += read_level_num;
485                for i in 0..read_level_num {
486                    assert!(!decoded.is_empty());
487                    //check each read bit
488                    let read_bit = decoded.get_bit(i);
489                    if read_bit {
490                        read_value += 1;
491                    }
492                    let expect_bit = expected.get_bit(i + offset);
493                    assert_eq!(read_bit, expect_bit);
494                }
495            }
496        }
497        assert_eq!(read_level + skip_level, len);
498        assert_eq!(read_value + skip_value, total_value);
499    }
500
501    #[test]
502    fn test_try_consume_all_valid() {
503        // Test with all-valid data (all 1s) - single RLE run
504        let len = 100;
505        let mut encoder = RleEncoder::new(1, 1024);
506        for _ in 0..len {
507            encoder.put(1); // all valid
508        }
509        let encoded = encoder.consume();
510        let mut decoder = PackedDecoder::new();
511        decoder.set_data(Encoding::RLE, encoded.into());
512
513        // try_consume_all_valid now parses the RLE block itself, no need to read first
514        let result = decoder.try_consume_all_valid(len).unwrap();
515        assert_eq!(result, Some(len));
516
517        // Test with all-null data (all 0s)
518        let mut encoder = RleEncoder::new(1, 1024);
519        for _ in 0..len {
520            encoder.put(0); // all null
521        }
522        let encoded = encoder.consume();
523        let mut decoder = PackedDecoder::new();
524        decoder.set_data(Encoding::RLE, encoded.into());
525
526        // Should return None because rle_value is false (all nulls)
527        let result = decoder.try_consume_all_valid(len).unwrap();
528        assert_eq!(result, None);
529
530        // Test when requesting more than available in current RLE run
531        let mut encoder = RleEncoder::new(1, 1024);
532        for _ in 0..10 {
533            encoder.put(1); // small run of valid
534        }
535        for _ in 0..10 {
536            encoder.put(0); // followed by nulls
537        }
538        let encoded = encoder.consume();
539        let mut decoder = PackedDecoder::new();
540        decoder.set_data(Encoding::RLE, encoded.into());
541
542        // Request more than the valid run - should return None
543        // (because we don't look ahead to next block)
544        let result = decoder.try_consume_all_valid(20).unwrap();
545        assert_eq!(result, None);
546
547        // Reset decoder and try requesting within the run
548        decoder.set_data(Encoding::RLE, {
549            let mut encoder = RleEncoder::new(1, 1024);
550            for _ in 0..10 {
551                encoder.put(1);
552            }
553            for _ in 0..10 {
554                encoder.put(0);
555            }
556            encoder.consume().into()
557        });
558
559        let result = decoder.try_consume_all_valid(5).unwrap();
560        assert_eq!(result, Some(5));
561
562        // After skipping 5, we should have 5 left in the valid RLE run
563        let result = decoder.try_consume_all_valid(5).unwrap();
564        assert_eq!(result, Some(5));
565
566        // Now the valid run is exhausted, next call should parse the null run and return None
567        let result = decoder.try_consume_all_valid(5).unwrap();
568        assert_eq!(result, None);
569    }
570}