parquet/arrow/decoder/
delta_byte_array.rs1use bytes::Bytes;
19
20use crate::data_type::Int32Type;
21use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
22use crate::errors::{ParquetError, Result};
23
24pub struct DeltaByteArrayDecoder {
26 prefix_lengths: Vec<i32>,
27 suffix_lengths: Vec<i32>,
28 data: Bytes,
29 length_offset: usize,
30 data_offset: usize,
31 last_value: Vec<u8>,
32}
33
34impl DeltaByteArrayDecoder {
35 pub fn new(data: Bytes) -> Result<Self> {
37 let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
38 prefix.set_data(data.clone(), 0)?;
39
40 let num_prefix = prefix.values_left();
41 let mut prefix_lengths = vec![0; num_prefix];
42 assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);
43
44 let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
45 suffix.set_data(data.slice(prefix.get_offset()..), 0)?;
46
47 let num_suffix = suffix.values_left();
48 let mut suffix_lengths = vec![0; num_suffix];
49 assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);
50
51 if num_prefix != num_suffix {
52 return Err(general_err!(format!(
53 "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {num_prefix}, suffixes: {num_suffix}"
54 )));
55 }
56
57 assert_eq!(prefix_lengths.len(), suffix_lengths.len());
58
59 Ok(Self {
60 prefix_lengths,
61 suffix_lengths,
62 data,
63 length_offset: 0,
64 data_offset: prefix.get_offset() + suffix.get_offset(),
65 last_value: vec![],
66 })
67 }
68
69 pub fn remaining(&self) -> usize {
71 self.prefix_lengths.len() - self.length_offset
72 }
73
74 pub fn read<F>(&mut self, len: usize, mut f: F) -> Result<usize>
79 where
80 F: FnMut(&[u8]) -> Result<()>,
81 {
82 let to_read = len.min(self.remaining());
83
84 let length_range = self.length_offset..self.length_offset + to_read;
85 let iter = self.prefix_lengths[length_range.clone()]
86 .iter()
87 .zip(&self.suffix_lengths[length_range]);
88
89 let data = self.data.as_ref();
90
91 for (prefix_length, suffix_length) in iter {
92 let prefix_length = *prefix_length as usize;
93 let suffix_length = *suffix_length as usize;
94
95 if self.data_offset + suffix_length > self.data.len() {
96 return Err(ParquetError::EOF("eof decoding byte array".into()));
97 }
98
99 self.last_value.truncate(prefix_length);
100 self.last_value
101 .extend_from_slice(&data[self.data_offset..self.data_offset + suffix_length]);
102 f(&self.last_value)?;
103
104 self.data_offset += suffix_length;
105 }
106
107 self.length_offset += to_read;
108 Ok(to_read)
109 }
110
111 pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
113 let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);
114
115 let length_range = self.length_offset..self.length_offset + to_skip;
116 let iter = self.prefix_lengths[length_range.clone()]
117 .iter()
118 .zip(&self.suffix_lengths[length_range]);
119
120 let data = self.data.as_ref();
121
122 for (prefix_length, suffix_length) in iter {
123 let prefix_length = *prefix_length as usize;
124 let suffix_length = *suffix_length as usize;
125
126 if self.data_offset + suffix_length > self.data.len() {
127 return Err(ParquetError::EOF("eof decoding byte array".into()));
128 }
129
130 self.last_value.truncate(prefix_length);
131 self.last_value
132 .extend_from_slice(&data[self.data_offset..self.data_offset + suffix_length]);
133 self.data_offset += suffix_length;
134 }
135 self.length_offset += to_skip;
136 Ok(to_skip)
137 }
138}