parquet/arrow/record_reader/
definition_levels.rs1use arrow_array::builder::BooleanBufferBuilder;
19use arrow_buffer::Buffer;
20use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
21use bytes::Bytes;
22
23use crate::arrow::buffer::bit_util::count_set_bits;
24use crate::basic::Encoding;
25use crate::column::reader::decoder::{
26 ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
27};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30
31enum BufferInner {
32 Full {
34 levels: Vec<i16>,
35 nulls: BooleanBufferBuilder,
36 max_level: i16,
37 },
38 Mask { nulls: BooleanBufferBuilder },
44}
45
46pub struct DefinitionLevelBuffer {
47 inner: BufferInner,
48
49 len: usize,
53}
54
55impl DefinitionLevelBuffer {
56 pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
57 let inner = match null_mask_only {
58 true => {
59 assert_eq!(
60 desc.max_def_level(),
61 1,
62 "max definition level must be 1 to only compute null bitmask"
63 );
64
65 assert_eq!(
66 desc.max_rep_level(),
67 0,
68 "max repetition level must be 0 to only compute null bitmask"
69 );
70
71 BufferInner::Mask {
72 nulls: BooleanBufferBuilder::new(0),
73 }
74 }
75 false => BufferInner::Full {
76 levels: Vec::new(),
77 nulls: BooleanBufferBuilder::new(0),
78 max_level: desc.max_def_level(),
79 },
80 };
81
82 Self { inner, len: 0 }
83 }
84
85 pub fn consume_levels(&mut self) -> Option<Vec<i16>> {
87 match &mut self.inner {
88 BufferInner::Full { levels, .. } => Some(std::mem::take(levels)),
89 BufferInner::Mask { .. } => None,
90 }
91 }
92
93 pub fn consume_bitmask(&mut self) -> Option<Buffer> {
95 self.len = 0;
96 let nulls = match &mut self.inner {
97 BufferInner::Full { nulls, .. } => nulls,
98 BufferInner::Mask { nulls } => nulls,
99 };
100
101 let buffer = nulls.finish().into_inner();
103
104 if buffer.is_empty() {
106 return None;
107 }
108
109 Some(buffer)
110 }
111
112 pub fn nulls(&self) -> &BooleanBufferBuilder {
113 match &self.inner {
114 BufferInner::Full { nulls, .. } => nulls,
115 BufferInner::Mask { nulls } => nulls,
116 }
117 }
118}
119
120enum MaybePacked {
121 Packed(PackedDecoder),
122 Fallback(DefinitionLevelDecoderImpl),
123}
124
125pub struct DefinitionLevelBufferDecoder {
126 max_level: i16,
127 decoder: MaybePacked,
128}
129
130impl DefinitionLevelBufferDecoder {
131 pub fn new(max_level: i16, packed: bool) -> Self {
132 let decoder = match packed {
133 true => MaybePacked::Packed(PackedDecoder::new()),
134 false => MaybePacked::Fallback(DefinitionLevelDecoderImpl::new(max_level)),
135 };
136
137 Self { max_level, decoder }
138 }
139}
140
141impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
142 type Buffer = DefinitionLevelBuffer;
143
144 fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
145 match &mut self.decoder {
146 MaybePacked::Packed(d) => d.set_data(encoding, data),
147 MaybePacked::Fallback(d) => d.set_data(encoding, data)?,
148 };
149 Ok(())
150 }
151}
152
153impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
154 fn read_def_levels(
155 &mut self,
156 writer: &mut Self::Buffer,
157 num_levels: usize,
158 ) -> Result<(usize, usize)> {
159 match (&mut writer.inner, &mut self.decoder) {
160 (
161 BufferInner::Full {
162 levels,
163 nulls,
164 max_level,
165 },
166 MaybePacked::Fallback(decoder),
167 ) => {
168 assert_eq!(self.max_level, *max_level);
169
170 let start = levels.len();
171 let (values_read, levels_read) = decoder.read_def_levels(levels, num_levels)?;
172
173 unsafe {
175 nulls
176 .extend_trusted_len(levels[start..].iter().map(|level| level == max_level));
177 }
178
179 Ok((values_read, levels_read))
180 }
181 (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
182 assert_eq!(self.max_level, 1);
183
184 if let Some(count) = decoder.try_consume_all_valid(num_levels)? {
188 nulls.append_n(count, true);
189 return Ok((count, count)); }
191
192 let start = nulls.len();
194 let levels_read = decoder.read(nulls, num_levels)?;
195
196 let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read);
197 Ok((values_read, levels_read))
198 }
199 _ => unreachable!("inconsistent null mask"),
200 }
201 }
202
203 fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
204 match &mut self.decoder {
205 MaybePacked::Fallback(decoder) => decoder.skip_def_levels(num_levels),
206 MaybePacked::Packed(decoder) => decoder.skip(num_levels),
207 }
208 }
209}
210
211struct PackedDecoder {
226 data: Bytes,
227 data_offset: usize,
228 rle_left: usize,
229 rle_value: bool,
230 packed_count: usize,
231 packed_offset: usize,
232}
233
234impl PackedDecoder {
235 fn next_rle_block(&mut self) -> Result<()> {
236 let indicator_value = self.decode_header()?;
237 if indicator_value & 1 == 1 {
238 let len = (indicator_value >> 1) as usize;
239 self.packed_count = len * 8;
240 self.packed_offset = 0;
241 } else {
242 self.rle_left = (indicator_value >> 1) as usize;
243 let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| {
244 ParquetError::EOF(
245 "unexpected end of file whilst decoding definition levels rle value".into(),
246 )
247 })?;
248
249 self.data_offset += 1;
250 self.rle_value = byte != 0;
251 }
252 Ok(())
253 }
254
255 fn decode_header(&mut self) -> Result<i64> {
257 let mut offset = 0;
258 let mut v: i64 = 0;
259 while offset < 10 {
260 let byte = *self
261 .data
262 .as_ref()
263 .get(self.data_offset + offset)
264 .ok_or_else(|| {
265 ParquetError::EOF(
266 "unexpected end of file whilst decoding definition levels rle header"
267 .into(),
268 )
269 })?;
270
271 v |= ((byte & 0x7F) as i64) << (offset * 7);
272 offset += 1;
273 if byte & 0x80 == 0 {
274 self.data_offset += offset;
275 return Ok(v);
276 }
277 }
278 Err(general_err!("too many bytes for VLQ"))
279 }
280}
281
282impl PackedDecoder {
283 fn new() -> Self {
284 Self {
285 data: Bytes::from(vec![]),
286 data_offset: 0,
287 rle_left: 0,
288 rle_value: false,
289 packed_count: 0,
290 packed_offset: 0,
291 }
292 }
293
294 fn set_data(&mut self, encoding: Encoding, data: Bytes) {
295 self.rle_left = 0;
296 self.rle_value = false;
297 self.packed_offset = 0;
298 self.packed_count = match encoding {
299 Encoding::RLE => 0,
300 #[allow(deprecated)]
301 Encoding::BIT_PACKED => data.len() * 8,
302 _ => unreachable!("invalid level encoding: {}", encoding),
303 };
304 self.data = data;
305 self.data_offset = 0;
306 }
307
308 fn try_consume_all_valid(&mut self, len: usize) -> Result<Option<usize>> {
317 if self.rle_left == 0 && self.packed_count == self.packed_offset {
319 if self.data_offset < self.data.len() {
320 self.next_rle_block()?;
321 } else {
322 return Ok(None);
324 }
325 }
326
327 if self.rle_left >= len && self.rle_value {
330 self.rle_left -= len;
331 return Ok(Some(len));
332 }
333
334 Ok(None)
337 }
338
339 fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<usize> {
340 let mut read = 0;
341 while read != len {
342 if self.rle_left != 0 {
343 let to_read = self.rle_left.min(len - read);
344 buffer.append_n(to_read, self.rle_value);
345 self.rle_left -= to_read;
346 read += to_read;
347 } else if self.packed_count != self.packed_offset {
348 let to_read = (self.packed_count - self.packed_offset).min(len - read);
349 let offset = self.data_offset * 8 + self.packed_offset;
350 buffer.append_packed_range(offset..offset + to_read, self.data.as_ref());
351 self.packed_offset += to_read;
352 read += to_read;
353
354 if self.packed_offset == self.packed_count {
355 self.data_offset += self.packed_count / 8;
356 }
357 } else if self.data_offset == self.data.len() {
358 break;
359 } else {
360 self.next_rle_block()?
361 }
362 }
363 Ok(read)
364 }
365
366 fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
370 let mut skipped_value = 0;
371 let mut skipped_level = 0;
372 while skipped_level != level_num {
373 if self.rle_left != 0 {
374 let to_skip = self.rle_left.min(level_num - skipped_level);
375 self.rle_left -= to_skip;
376 skipped_level += to_skip;
377 if self.rle_value {
378 skipped_value += to_skip;
379 }
380 } else if self.packed_count != self.packed_offset {
381 let to_skip =
382 (self.packed_count - self.packed_offset).min(level_num - skipped_level);
383 let offset = self.data_offset * 8 + self.packed_offset;
384 let bit_chunk = UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
385 skipped_value += bit_chunk.count_ones();
386 self.packed_offset += to_skip;
387 skipped_level += to_skip;
388 if self.packed_offset == self.packed_count {
389 self.data_offset += self.packed_count / 8;
390 }
391 } else if self.data_offset == self.data.len() {
392 break;
393 } else {
394 self.next_rle_block()?
395 }
396 }
397 Ok((skipped_value, skipped_level))
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404
405 use crate::encodings::rle::RleEncoder;
406 use rand::{Rng, rng};
407
408 #[test]
409 fn test_packed_decoder() {
410 let mut rng = rng();
411 let len: usize = rng.random_range(512..1024);
412
413 let mut expected = BooleanBufferBuilder::new(len);
414 let mut encoder = RleEncoder::new(1, 1024);
415 for _ in 0..len {
416 let bool = rng.random_bool(0.8);
417 encoder.put(bool as u64);
418 expected.append(bool);
419 }
420 assert_eq!(expected.len(), len);
421
422 let encoded = encoder.consume();
423 let mut decoder = PackedDecoder::new();
424 decoder.set_data(Encoding::RLE, encoded.into());
425
426 let mut decoded = BooleanBufferBuilder::new(len);
428 loop {
429 let remaining = len - decoded.len();
430 if remaining == 0 {
431 break;
432 }
433
434 let to_read = rng.random_range(1..=remaining);
435 decoder.read(&mut decoded, to_read).unwrap();
436 }
437
438 assert_eq!(decoded.len(), len);
439 assert_eq!(decoded.as_slice(), expected.as_slice());
440 }
441
442 #[test]
443 fn test_packed_decoder_skip() {
444 let mut rng = rng();
445 let len: usize = rng.random_range(512..1024);
446
447 let mut expected = BooleanBufferBuilder::new(len);
448 let mut encoder = RleEncoder::new(1, 1024);
449
450 let mut total_value = 0;
451 for _ in 0..len {
452 let bool = rng.random_bool(0.8);
453 encoder.put(bool as u64);
454 expected.append(bool);
455 if bool {
456 total_value += 1;
457 }
458 }
459 assert_eq!(expected.len(), len);
460
461 let encoded = encoder.consume();
462 let mut decoder = PackedDecoder::new();
463 decoder.set_data(Encoding::RLE, encoded.into());
464
465 let mut skip_value = 0;
466 let mut read_value = 0;
467 let mut skip_level = 0;
468 let mut read_level = 0;
469
470 loop {
471 let offset = skip_level + read_level;
472 let remaining_levels = len - offset;
473 if remaining_levels == 0 {
474 break;
475 }
476 let to_read_or_skip_level = rng.random_range(1..=remaining_levels);
477 if rng.random_bool(0.5) {
478 let (skip_val_num, skip_level_num) = decoder.skip(to_read_or_skip_level).unwrap();
479 skip_value += skip_val_num;
480 skip_level += skip_level_num
481 } else {
482 let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
483 let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
484 read_level += read_level_num;
485 for i in 0..read_level_num {
486 assert!(!decoded.is_empty());
487 let read_bit = decoded.get_bit(i);
489 if read_bit {
490 read_value += 1;
491 }
492 let expect_bit = expected.get_bit(i + offset);
493 assert_eq!(read_bit, expect_bit);
494 }
495 }
496 }
497 assert_eq!(read_level + skip_level, len);
498 assert_eq!(read_value + skip_value, total_value);
499 }
500
501 #[test]
502 fn test_try_consume_all_valid() {
503 let len = 100;
505 let mut encoder = RleEncoder::new(1, 1024);
506 for _ in 0..len {
507 encoder.put(1); }
509 let encoded = encoder.consume();
510 let mut decoder = PackedDecoder::new();
511 decoder.set_data(Encoding::RLE, encoded.into());
512
513 let result = decoder.try_consume_all_valid(len).unwrap();
515 assert_eq!(result, Some(len));
516
517 let mut encoder = RleEncoder::new(1, 1024);
519 for _ in 0..len {
520 encoder.put(0); }
522 let encoded = encoder.consume();
523 let mut decoder = PackedDecoder::new();
524 decoder.set_data(Encoding::RLE, encoded.into());
525
526 let result = decoder.try_consume_all_valid(len).unwrap();
528 assert_eq!(result, None);
529
530 let mut encoder = RleEncoder::new(1, 1024);
532 for _ in 0..10 {
533 encoder.put(1); }
535 for _ in 0..10 {
536 encoder.put(0); }
538 let encoded = encoder.consume();
539 let mut decoder = PackedDecoder::new();
540 decoder.set_data(Encoding::RLE, encoded.into());
541
542 let result = decoder.try_consume_all_valid(20).unwrap();
545 assert_eq!(result, None);
546
547 decoder.set_data(Encoding::RLE, {
549 let mut encoder = RleEncoder::new(1, 1024);
550 for _ in 0..10 {
551 encoder.put(1);
552 }
553 for _ in 0..10 {
554 encoder.put(0);
555 }
556 encoder.consume().into()
557 });
558
559 let result = decoder.try_consume_all_valid(5).unwrap();
560 assert_eq!(result, Some(5));
561
562 let result = decoder.try_consume_all_valid(5).unwrap();
564 assert_eq!(result, Some(5));
565
566 let result = decoder.try_consume_all_valid(5).unwrap();
568 assert_eq!(result, None);
569 }
570}