parquet/arrow/record_reader/
definition_levels.rs1use arrow_array::builder::BooleanBufferBuilder;
19use arrow_buffer::Buffer;
20use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
21use bytes::Bytes;
22
23use crate::arrow::buffer::bit_util::count_set_bits;
24use crate::basic::Encoding;
25use crate::column::reader::decoder::{
26 ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
27};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30
31enum BufferInner {
32 Full {
34 levels: Vec<i16>,
35 nulls: BooleanBufferBuilder,
36 max_level: i16,
37 },
38 Mask { nulls: BooleanBufferBuilder },
44}
45
46pub struct DefinitionLevelBuffer {
47 inner: BufferInner,
48
49 len: usize,
53}
54
55impl DefinitionLevelBuffer {
56 pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
57 let inner = match null_mask_only {
58 true => {
59 assert_eq!(
60 desc.max_def_level(),
61 1,
62 "max definition level must be 1 to only compute null bitmask"
63 );
64
65 assert_eq!(
66 desc.max_rep_level(),
67 0,
68 "max repetition level must be 0 to only compute null bitmask"
69 );
70
71 BufferInner::Mask {
72 nulls: BooleanBufferBuilder::new(0),
73 }
74 }
75 false => BufferInner::Full {
76 levels: Vec::new(),
77 nulls: BooleanBufferBuilder::new(0),
78 max_level: desc.max_def_level(),
79 },
80 };
81
82 Self { inner, len: 0 }
83 }
84
85 pub fn consume_levels(&mut self) -> Option<Vec<i16>> {
87 match &mut self.inner {
88 BufferInner::Full { levels, .. } => Some(std::mem::take(levels)),
89 BufferInner::Mask { .. } => None,
90 }
91 }
92
93 pub fn consume_bitmask(&mut self) -> Buffer {
95 self.len = 0;
96 match &mut self.inner {
97 BufferInner::Full { nulls, .. } => nulls.finish().into_inner(),
98 BufferInner::Mask { nulls } => nulls.finish().into_inner(),
99 }
100 }
101
102 pub fn nulls(&self) -> &BooleanBufferBuilder {
103 match &self.inner {
104 BufferInner::Full { nulls, .. } => nulls,
105 BufferInner::Mask { nulls } => nulls,
106 }
107 }
108}
109
110enum MaybePacked {
111 Packed(PackedDecoder),
112 Fallback(DefinitionLevelDecoderImpl),
113}
114
115pub struct DefinitionLevelBufferDecoder {
116 max_level: i16,
117 decoder: MaybePacked,
118}
119
120impl DefinitionLevelBufferDecoder {
121 pub fn new(max_level: i16, packed: bool) -> Self {
122 let decoder = match packed {
123 true => MaybePacked::Packed(PackedDecoder::new()),
124 false => MaybePacked::Fallback(DefinitionLevelDecoderImpl::new(max_level)),
125 };
126
127 Self { max_level, decoder }
128 }
129}
130
131impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
132 type Buffer = DefinitionLevelBuffer;
133
134 fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
135 match &mut self.decoder {
136 MaybePacked::Packed(d) => d.set_data(encoding, data),
137 MaybePacked::Fallback(d) => d.set_data(encoding, data)?,
138 };
139 Ok(())
140 }
141}
142
143impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
144 fn read_def_levels(
145 &mut self,
146 writer: &mut Self::Buffer,
147 num_levels: usize,
148 ) -> Result<(usize, usize)> {
149 match (&mut writer.inner, &mut self.decoder) {
150 (
151 BufferInner::Full {
152 levels,
153 nulls,
154 max_level,
155 },
156 MaybePacked::Fallback(decoder),
157 ) => {
158 assert_eq!(self.max_level, *max_level);
159
160 let start = levels.len();
161 let (values_read, levels_read) = decoder.read_def_levels(levels, num_levels)?;
162
163 nulls.reserve(levels_read);
164 for i in &levels[start..] {
165 nulls.append(i == max_level);
166 }
167
168 Ok((values_read, levels_read))
169 }
170 (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
171 assert_eq!(self.max_level, 1);
172
173 let start = nulls.len();
174 let levels_read = decoder.read(nulls, num_levels)?;
175
176 let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read);
177 Ok((values_read, levels_read))
178 }
179 _ => unreachable!("inconsistent null mask"),
180 }
181 }
182
183 fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
184 match &mut self.decoder {
185 MaybePacked::Fallback(decoder) => decoder.skip_def_levels(num_levels),
186 MaybePacked::Packed(decoder) => decoder.skip(num_levels),
187 }
188 }
189}
190
191struct PackedDecoder {
206 data: Bytes,
207 data_offset: usize,
208 rle_left: usize,
209 rle_value: bool,
210 packed_count: usize,
211 packed_offset: usize,
212}
213
214impl PackedDecoder {
215 fn next_rle_block(&mut self) -> Result<()> {
216 let indicator_value = self.decode_header()?;
217 if indicator_value & 1 == 1 {
218 let len = (indicator_value >> 1) as usize;
219 self.packed_count = len * 8;
220 self.packed_offset = 0;
221 } else {
222 self.rle_left = (indicator_value >> 1) as usize;
223 let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| {
224 ParquetError::EOF(
225 "unexpected end of file whilst decoding definition levels rle value".into(),
226 )
227 })?;
228
229 self.data_offset += 1;
230 self.rle_value = byte != 0;
231 }
232 Ok(())
233 }
234
235 fn decode_header(&mut self) -> Result<i64> {
237 let mut offset = 0;
238 let mut v: i64 = 0;
239 while offset < 10 {
240 let byte = *self
241 .data
242 .as_ref()
243 .get(self.data_offset + offset)
244 .ok_or_else(|| {
245 ParquetError::EOF(
246 "unexpected end of file whilst decoding definition levels rle header"
247 .into(),
248 )
249 })?;
250
251 v |= ((byte & 0x7F) as i64) << (offset * 7);
252 offset += 1;
253 if byte & 0x80 == 0 {
254 self.data_offset += offset;
255 return Ok(v);
256 }
257 }
258 Err(general_err!("too many bytes for VLQ"))
259 }
260}
261
262impl PackedDecoder {
263 fn new() -> Self {
264 Self {
265 data: Bytes::from(vec![]),
266 data_offset: 0,
267 rle_left: 0,
268 rle_value: false,
269 packed_count: 0,
270 packed_offset: 0,
271 }
272 }
273
274 fn set_data(&mut self, encoding: Encoding, data: Bytes) {
275 self.rle_left = 0;
276 self.rle_value = false;
277 self.packed_offset = 0;
278 self.packed_count = match encoding {
279 Encoding::RLE => 0,
280 #[allow(deprecated)]
281 Encoding::BIT_PACKED => data.len() * 8,
282 _ => unreachable!("invalid level encoding: {}", encoding),
283 };
284 self.data = data;
285 self.data_offset = 0;
286 }
287
288 fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<usize> {
289 let mut read = 0;
290 while read != len {
291 if self.rle_left != 0 {
292 let to_read = self.rle_left.min(len - read);
293 buffer.append_n(to_read, self.rle_value);
294 self.rle_left -= to_read;
295 read += to_read;
296 } else if self.packed_count != self.packed_offset {
297 let to_read = (self.packed_count - self.packed_offset).min(len - read);
298 let offset = self.data_offset * 8 + self.packed_offset;
299 buffer.append_packed_range(offset..offset + to_read, self.data.as_ref());
300 self.packed_offset += to_read;
301 read += to_read;
302
303 if self.packed_offset == self.packed_count {
304 self.data_offset += self.packed_count / 8;
305 }
306 } else if self.data_offset == self.data.len() {
307 break;
308 } else {
309 self.next_rle_block()?
310 }
311 }
312 Ok(read)
313 }
314
315 fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
319 let mut skipped_value = 0;
320 let mut skipped_level = 0;
321 while skipped_level != level_num {
322 if self.rle_left != 0 {
323 let to_skip = self.rle_left.min(level_num - skipped_level);
324 self.rle_left -= to_skip;
325 skipped_level += to_skip;
326 if self.rle_value {
327 skipped_value += to_skip;
328 }
329 } else if self.packed_count != self.packed_offset {
330 let to_skip =
331 (self.packed_count - self.packed_offset).min(level_num - skipped_level);
332 let offset = self.data_offset * 8 + self.packed_offset;
333 let bit_chunk = UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
334 skipped_value += bit_chunk.count_ones();
335 self.packed_offset += to_skip;
336 skipped_level += to_skip;
337 if self.packed_offset == self.packed_count {
338 self.data_offset += self.packed_count / 8;
339 }
340 } else if self.data_offset == self.data.len() {
341 break;
342 } else {
343 self.next_rle_block()?
344 }
345 }
346 Ok((skipped_value, skipped_level))
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 use crate::encodings::rle::RleEncoder;
355 use rand::{Rng, rng};
356
357 #[test]
358 fn test_packed_decoder() {
359 let mut rng = rng();
360 let len: usize = rng.random_range(512..1024);
361
362 let mut expected = BooleanBufferBuilder::new(len);
363 let mut encoder = RleEncoder::new(1, 1024);
364 for _ in 0..len {
365 let bool = rng.random_bool(0.8);
366 encoder.put(bool as u64);
367 expected.append(bool);
368 }
369 assert_eq!(expected.len(), len);
370
371 let encoded = encoder.consume();
372 let mut decoder = PackedDecoder::new();
373 decoder.set_data(Encoding::RLE, encoded.into());
374
375 let mut decoded = BooleanBufferBuilder::new(len);
377 loop {
378 let remaining = len - decoded.len();
379 if remaining == 0 {
380 break;
381 }
382
383 let to_read = rng.random_range(1..=remaining);
384 decoder.read(&mut decoded, to_read).unwrap();
385 }
386
387 assert_eq!(decoded.len(), len);
388 assert_eq!(decoded.as_slice(), expected.as_slice());
389 }
390
391 #[test]
392 fn test_packed_decoder_skip() {
393 let mut rng = rng();
394 let len: usize = rng.random_range(512..1024);
395
396 let mut expected = BooleanBufferBuilder::new(len);
397 let mut encoder = RleEncoder::new(1, 1024);
398
399 let mut total_value = 0;
400 for _ in 0..len {
401 let bool = rng.random_bool(0.8);
402 encoder.put(bool as u64);
403 expected.append(bool);
404 if bool {
405 total_value += 1;
406 }
407 }
408 assert_eq!(expected.len(), len);
409
410 let encoded = encoder.consume();
411 let mut decoder = PackedDecoder::new();
412 decoder.set_data(Encoding::RLE, encoded.into());
413
414 let mut skip_value = 0;
415 let mut read_value = 0;
416 let mut skip_level = 0;
417 let mut read_level = 0;
418
419 loop {
420 let offset = skip_level + read_level;
421 let remaining_levels = len - offset;
422 if remaining_levels == 0 {
423 break;
424 }
425 let to_read_or_skip_level = rng.random_range(1..=remaining_levels);
426 if rng.random_bool(0.5) {
427 let (skip_val_num, skip_level_num) = decoder.skip(to_read_or_skip_level).unwrap();
428 skip_value += skip_val_num;
429 skip_level += skip_level_num
430 } else {
431 let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
432 let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
433 read_level += read_level_num;
434 for i in 0..read_level_num {
435 assert!(!decoded.is_empty());
436 let read_bit = decoded.get_bit(i);
438 if read_bit {
439 read_value += 1;
440 }
441 let expect_bit = expected.get_bit(i + offset);
442 assert_eq!(read_bit, expect_bit);
443 }
444 }
445 }
446 assert_eq!(read_level + skip_level, len);
447 assert_eq!(read_value + skip_value, total_value);
448 }
449}