parquet/arrow/record_reader/
definition_levels.rs1use arrow_array::builder::BooleanBufferBuilder;
19use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
20use arrow_buffer::Buffer;
21use bytes::Bytes;
22
23use crate::arrow::buffer::bit_util::count_set_bits;
24use crate::basic::Encoding;
25use crate::column::reader::decoder::{
26 ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
27};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30
31enum BufferInner {
32 Full {
34 levels: Vec<i16>,
35 nulls: BooleanBufferBuilder,
36 max_level: i16,
37 },
38 Mask { nulls: BooleanBufferBuilder },
44}
45
46pub struct DefinitionLevelBuffer {
47 inner: BufferInner,
48
49 len: usize,
53}
54
55impl DefinitionLevelBuffer {
56 pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
57 let inner = match null_mask_only {
58 true => {
59 assert_eq!(
60 desc.max_def_level(),
61 1,
62 "max definition level must be 1 to only compute null bitmask"
63 );
64
65 assert_eq!(
66 desc.max_rep_level(),
67 0,
68 "max repetition level must be 0 to only compute null bitmask"
69 );
70
71 BufferInner::Mask {
72 nulls: BooleanBufferBuilder::new(0),
73 }
74 }
75 false => BufferInner::Full {
76 levels: Vec::new(),
77 nulls: BooleanBufferBuilder::new(0),
78 max_level: desc.max_def_level(),
79 },
80 };
81
82 Self { inner, len: 0 }
83 }
84
85 pub fn consume_levels(&mut self) -> Option<Vec<i16>> {
87 match &mut self.inner {
88 BufferInner::Full { levels, .. } => Some(std::mem::take(levels)),
89 BufferInner::Mask { .. } => None,
90 }
91 }
92
93 pub fn consume_bitmask(&mut self) -> Buffer {
95 self.len = 0;
96 match &mut self.inner {
97 BufferInner::Full { nulls, .. } => nulls.finish().into_inner(),
98 BufferInner::Mask { nulls } => nulls.finish().into_inner(),
99 }
100 }
101
102 pub fn nulls(&self) -> &BooleanBufferBuilder {
103 match &self.inner {
104 BufferInner::Full { nulls, .. } => nulls,
105 BufferInner::Mask { nulls } => nulls,
106 }
107 }
108}
109
110enum MaybePacked {
111 Packed(PackedDecoder),
112 Fallback(DefinitionLevelDecoderImpl),
113}
114
115pub struct DefinitionLevelBufferDecoder {
116 max_level: i16,
117 decoder: MaybePacked,
118}
119
120impl DefinitionLevelBufferDecoder {
121 pub fn new(max_level: i16, packed: bool) -> Self {
122 let decoder = match packed {
123 true => MaybePacked::Packed(PackedDecoder::new()),
124 false => MaybePacked::Fallback(DefinitionLevelDecoderImpl::new(max_level)),
125 };
126
127 Self { max_level, decoder }
128 }
129}
130
131impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
132 type Buffer = DefinitionLevelBuffer;
133
134 fn set_data(&mut self, encoding: Encoding, data: Bytes) {
135 match &mut self.decoder {
136 MaybePacked::Packed(d) => d.set_data(encoding, data),
137 MaybePacked::Fallback(d) => d.set_data(encoding, data),
138 }
139 }
140}
141
142impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
143 fn read_def_levels(
144 &mut self,
145 writer: &mut Self::Buffer,
146 num_levels: usize,
147 ) -> Result<(usize, usize)> {
148 match (&mut writer.inner, &mut self.decoder) {
149 (
150 BufferInner::Full {
151 levels,
152 nulls,
153 max_level,
154 },
155 MaybePacked::Fallback(decoder),
156 ) => {
157 assert_eq!(self.max_level, *max_level);
158
159 let start = levels.len();
160 let (values_read, levels_read) = decoder.read_def_levels(levels, num_levels)?;
161
162 nulls.reserve(levels_read);
163 for i in &levels[start..] {
164 nulls.append(i == max_level);
165 }
166
167 Ok((values_read, levels_read))
168 }
169 (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
170 assert_eq!(self.max_level, 1);
171
172 let start = nulls.len();
173 let levels_read = decoder.read(nulls, num_levels)?;
174
175 let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read);
176 Ok((values_read, levels_read))
177 }
178 _ => unreachable!("inconsistent null mask"),
179 }
180 }
181
182 fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
183 match &mut self.decoder {
184 MaybePacked::Fallback(decoder) => decoder.skip_def_levels(num_levels),
185 MaybePacked::Packed(decoder) => decoder.skip(num_levels),
186 }
187 }
188}
189
190struct PackedDecoder {
205 data: Bytes,
206 data_offset: usize,
207 rle_left: usize,
208 rle_value: bool,
209 packed_count: usize,
210 packed_offset: usize,
211}
212
213impl PackedDecoder {
214 fn next_rle_block(&mut self) -> Result<()> {
215 let indicator_value = self.decode_header()?;
216 if indicator_value & 1 == 1 {
217 let len = (indicator_value >> 1) as usize;
218 self.packed_count = len * 8;
219 self.packed_offset = 0;
220 } else {
221 self.rle_left = (indicator_value >> 1) as usize;
222 let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| {
223 ParquetError::EOF(
224 "unexpected end of file whilst decoding definition levels rle value".into(),
225 )
226 })?;
227
228 self.data_offset += 1;
229 self.rle_value = byte != 0;
230 }
231 Ok(())
232 }
233
234 fn decode_header(&mut self) -> Result<i64> {
236 let mut offset = 0;
237 let mut v: i64 = 0;
238 while offset < 10 {
239 let byte = *self
240 .data
241 .as_ref()
242 .get(self.data_offset + offset)
243 .ok_or_else(|| {
244 ParquetError::EOF(
245 "unexpected end of file whilst decoding definition levels rle header"
246 .into(),
247 )
248 })?;
249
250 v |= ((byte & 0x7F) as i64) << (offset * 7);
251 offset += 1;
252 if byte & 0x80 == 0 {
253 self.data_offset += offset;
254 return Ok(v);
255 }
256 }
257 Err(general_err!("too many bytes for VLQ"))
258 }
259}
260
261impl PackedDecoder {
262 fn new() -> Self {
263 Self {
264 data: Bytes::from(vec![]),
265 data_offset: 0,
266 rle_left: 0,
267 rle_value: false,
268 packed_count: 0,
269 packed_offset: 0,
270 }
271 }
272
273 fn set_data(&mut self, encoding: Encoding, data: Bytes) {
274 self.rle_left = 0;
275 self.rle_value = false;
276 self.packed_offset = 0;
277 self.packed_count = match encoding {
278 Encoding::RLE => 0,
279 #[allow(deprecated)]
280 Encoding::BIT_PACKED => data.len() * 8,
281 _ => unreachable!("invalid level encoding: {}", encoding),
282 };
283 self.data = data;
284 self.data_offset = 0;
285 }
286
287 fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<usize> {
288 let mut read = 0;
289 while read != len {
290 if self.rle_left != 0 {
291 let to_read = self.rle_left.min(len - read);
292 buffer.append_n(to_read, self.rle_value);
293 self.rle_left -= to_read;
294 read += to_read;
295 } else if self.packed_count != self.packed_offset {
296 let to_read = (self.packed_count - self.packed_offset).min(len - read);
297 let offset = self.data_offset * 8 + self.packed_offset;
298 buffer.append_packed_range(offset..offset + to_read, self.data.as_ref());
299 self.packed_offset += to_read;
300 read += to_read;
301
302 if self.packed_offset == self.packed_count {
303 self.data_offset += self.packed_count / 8;
304 }
305 } else if self.data_offset == self.data.len() {
306 break;
307 } else {
308 self.next_rle_block()?
309 }
310 }
311 Ok(read)
312 }
313
314 fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
318 let mut skipped_value = 0;
319 let mut skipped_level = 0;
320 while skipped_level != level_num {
321 if self.rle_left != 0 {
322 let to_skip = self.rle_left.min(level_num - skipped_level);
323 self.rle_left -= to_skip;
324 skipped_level += to_skip;
325 if self.rle_value {
326 skipped_value += to_skip;
327 }
328 } else if self.packed_count != self.packed_offset {
329 let to_skip =
330 (self.packed_count - self.packed_offset).min(level_num - skipped_level);
331 let offset = self.data_offset * 8 + self.packed_offset;
332 let bit_chunk = UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
333 skipped_value += bit_chunk.count_ones();
334 self.packed_offset += to_skip;
335 skipped_level += to_skip;
336 if self.packed_offset == self.packed_count {
337 self.data_offset += self.packed_count / 8;
338 }
339 } else if self.data_offset == self.data.len() {
340 break;
341 } else {
342 self.next_rle_block()?
343 }
344 }
345 Ok((skipped_value, skipped_level))
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use super::*;
352
353 use crate::encodings::rle::RleEncoder;
354 use rand::{thread_rng, Rng};
355
356 #[test]
357 fn test_packed_decoder() {
358 let mut rng = thread_rng();
359 let len: usize = rng.gen_range(512..1024);
360
361 let mut expected = BooleanBufferBuilder::new(len);
362 let mut encoder = RleEncoder::new(1, 1024);
363 for _ in 0..len {
364 let bool = rng.gen_bool(0.8);
365 encoder.put(bool as u64);
366 expected.append(bool);
367 }
368 assert_eq!(expected.len(), len);
369
370 let encoded = encoder.consume();
371 let mut decoder = PackedDecoder::new();
372 decoder.set_data(Encoding::RLE, encoded.into());
373
374 let mut decoded = BooleanBufferBuilder::new(len);
376 loop {
377 let remaining = len - decoded.len();
378 if remaining == 0 {
379 break;
380 }
381
382 let to_read = rng.gen_range(1..=remaining);
383 decoder.read(&mut decoded, to_read).unwrap();
384 }
385
386 assert_eq!(decoded.len(), len);
387 assert_eq!(decoded.as_slice(), expected.as_slice());
388 }
389
390 #[test]
391 fn test_packed_decoder_skip() {
392 let mut rng = thread_rng();
393 let len: usize = rng.gen_range(512..1024);
394
395 let mut expected = BooleanBufferBuilder::new(len);
396 let mut encoder = RleEncoder::new(1, 1024);
397
398 let mut total_value = 0;
399 for _ in 0..len {
400 let bool = rng.gen_bool(0.8);
401 encoder.put(bool as u64);
402 expected.append(bool);
403 if bool {
404 total_value += 1;
405 }
406 }
407 assert_eq!(expected.len(), len);
408
409 let encoded = encoder.consume();
410 let mut decoder = PackedDecoder::new();
411 decoder.set_data(Encoding::RLE, encoded.into());
412
413 let mut skip_value = 0;
414 let mut read_value = 0;
415 let mut skip_level = 0;
416 let mut read_level = 0;
417
418 loop {
419 let offset = skip_level + read_level;
420 let remaining_levels = len - offset;
421 if remaining_levels == 0 {
422 break;
423 }
424 let to_read_or_skip_level = rng.gen_range(1..=remaining_levels);
425 if rng.gen_bool(0.5) {
426 let (skip_val_num, skip_level_num) = decoder.skip(to_read_or_skip_level).unwrap();
427 skip_value += skip_val_num;
428 skip_level += skip_level_num
429 } else {
430 let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
431 let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
432 read_level += read_level_num;
433 for i in 0..read_level_num {
434 assert!(!decoded.is_empty());
435 let read_bit = decoded.get_bit(i);
437 if read_bit {
438 read_value += 1;
439 }
440 let expect_bit = expected.get_bit(i + offset);
441 assert_eq!(read_bit, expect_bit);
442 }
443 }
444 }
445 assert_eq!(read_level + skip_level, len);
446 assert_eq!(read_value + skip_value, total_value);
447 }
448}