parquet/encodings/encoding/
dict_encoder.rs1use bytes::Bytes;
22
23use crate::basic::{Encoding, Type};
24use crate::data_type::DataType;
25use crate::data_type::private::ParquetValueType;
26use crate::encodings::encoding::{Encoder, PlainEncoder};
27use crate::encodings::rle::RleEncoder;
28use crate::errors::Result;
29use crate::schema::types::ColumnDescPtr;
30use crate::util::bit_util::num_required_bits;
31use crate::util::interner::{Interner, Storage};
32
33#[derive(Debug)]
34struct KeyStorage<T: DataType> {
35 uniques: Vec<T::T>,
36
37 size_in_bytes: usize,
39
40 type_length: usize,
41}
42
43impl<T: DataType> Storage for KeyStorage<T> {
44 type Key = u64;
45 type Value = T::T;
46
47 fn get(&self, idx: Self::Key) -> &Self::Value {
48 &self.uniques[idx as usize]
49 }
50
51 fn push(&mut self, value: &Self::Value) -> Self::Key {
52 let (base_size, num_elements) = value.dict_encoding_size();
53
54 let unique_size = match T::get_physical_type() {
55 Type::BYTE_ARRAY => base_size + num_elements,
56 Type::FIXED_LEN_BYTE_ARRAY => self.type_length,
57 _ => base_size,
58 };
59 self.size_in_bytes += unique_size;
60
61 let key = self.uniques.len() as u64;
62 self.uniques.push(value.clone());
63 key
64 }
65
66 fn estimated_memory_size(&self) -> usize {
67 let uniques_heap_bytes = match T::get_physical_type() {
68 Type::FIXED_LEN_BYTE_ARRAY => self.type_length * self.uniques.len(),
69 _ => <Self::Value as ParquetValueType>::variable_length_bytes(&self.uniques)
70 .unwrap_or(0) as usize,
71 };
72 self.uniques.capacity() * std::mem::size_of::<T::T>() + uniques_heap_bytes
73 }
74}
75
76pub struct DictEncoder<T: DataType> {
87 interner: Interner<KeyStorage<T>>,
88
89 indices: Vec<u64>,
91}
92
93impl<T: DataType> DictEncoder<T> {
94 pub fn new(desc: ColumnDescPtr) -> Self {
96 let storage = KeyStorage {
97 uniques: vec![],
98 size_in_bytes: 0,
99 type_length: desc.type_length() as usize,
100 };
101
102 Self {
103 interner: Interner::new(storage),
104 indices: vec![],
105 }
106 }
107
108 pub fn is_sorted(&self) -> bool {
110 false
112 }
113
114 pub fn num_entries(&self) -> usize {
116 self.interner.storage().uniques.len()
117 }
118
119 pub fn dict_encoded_size(&self) -> usize {
121 self.interner.storage().size_in_bytes
122 }
123
124 pub fn write_dict(&self) -> Result<Bytes> {
127 let mut plain_encoder = PlainEncoder::<T>::new();
128 plain_encoder.put(&self.interner.storage().uniques)?;
129 plain_encoder.flush_buffer()
130 }
131
132 pub fn write_indices(&mut self) -> Result<Bytes> {
135 let buffer_len = self.estimated_data_encoded_size();
136 let mut buffer = Vec::with_capacity(buffer_len);
137 buffer.push(self.bit_width());
138
139 let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
141 for index in &self.indices {
142 encoder.put(*index)
143 }
144 self.indices.clear();
145 Ok(encoder.consume().into())
146 }
147
148 fn put_one(&mut self, value: &T::T) {
149 self.indices.push(self.interner.intern(value));
150 }
151
152 #[inline]
153 fn bit_width(&self) -> u8 {
154 num_required_bits(self.num_entries().saturating_sub(1) as u64)
155 }
156}
157
158impl<T: DataType> Encoder<T> for DictEncoder<T> {
159 fn put(&mut self, values: &[T::T]) -> Result<()> {
160 self.indices.reserve(values.len());
161 for i in values {
162 self.put_one(i)
163 }
164 Ok(())
165 }
166
167 fn encoding(&self) -> Encoding {
171 Encoding::PLAIN_DICTIONARY
172 }
173
174 fn estimated_data_encoded_size(&self) -> usize {
179 let bit_width = self.bit_width();
180 RleEncoder::max_buffer_size(bit_width, self.indices.len())
181 }
182
183 fn flush_buffer(&mut self) -> Result<Bytes> {
184 self.write_indices()
185 }
186
187 fn estimated_memory_size(&self) -> usize {
191 self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use std::sync::Arc;
198
199 use super::*;
200 use crate::data_type::{
201 ByteArray, ByteArrayType, FixedLenByteArray, FixedLenByteArrayType, Int32Type,
202 };
203 use crate::encodings::encoding::Encoder;
204 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
205
206 fn make_col_desc<T: DataType>() -> ColumnDescPtr {
207 make_col_desc_with_length::<T>(-1)
208 }
209
210 fn make_col_desc_with_length<T: DataType>(type_length: i32) -> ColumnDescPtr {
211 let ty = SchemaType::primitive_type_builder("col", T::get_physical_type())
212 .with_length(type_length)
213 .build()
214 .unwrap();
215 Arc::new(ColumnDescriptor::new(
216 Arc::new(ty),
217 0,
218 0,
219 ColumnPath::new(vec![]),
220 ))
221 }
222
223 #[test]
224 fn test_estimated_memory_size_primitive_with_duplicates() {
225 let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
226 let empty_size = encoder.estimated_memory_size();
227
228 encoder.put(&[1, 2, 3, 1, 2, 3, 1, 2, 3]).unwrap();
230
231 let size = encoder.estimated_memory_size();
232
233 let dict_entry_size = 3 * std::mem::size_of::<i32>();
235 assert!(
236 size >= empty_size + dict_entry_size,
237 "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
238 );
239
240 let indices_size = 9 * std::mem::size_of::<u64>();
242 assert!(
243 size >= empty_size + dict_entry_size + indices_size,
244 "memory size {size} should include indices ({indices_size} bytes)"
245 );
246 }
247
248 #[test]
249 fn test_estimated_memory_size_primitive_all_distinct() {
250 let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
251 let empty_size = encoder.estimated_memory_size();
252
253 let values: Vec<i32> = (0..100).collect();
254 encoder.put(&values).unwrap();
255
256 let size = encoder.estimated_memory_size();
257
258 let dict_entry_size = 100 * std::mem::size_of::<i32>();
260 assert!(
261 size >= empty_size + dict_entry_size,
262 "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
263 );
264
265 let indices_size = 100 * std::mem::size_of::<u64>();
267 assert!(
268 size >= empty_size + dict_entry_size + indices_size,
269 "memory size {size} should include indices ({indices_size} bytes)"
270 );
271 }
272
273 #[test]
274 fn test_estimated_memory_size_byte_array_with_duplicates() {
275 let mut encoder = DictEncoder::<ByteArrayType>::new(make_col_desc::<ByteArrayType>());
276 let empty_size = encoder.estimated_memory_size();
277
278 let vals: Vec<ByteArray> = [
281 "foo", "bar", "baz", "foo", "bar", "baz", "foo", "bar", "baz",
282 ]
283 .iter()
284 .map(|s| ByteArray::from(*s))
285 .collect();
286 encoder.put(&vals).unwrap();
287
288 let size = encoder.estimated_memory_size();
289
290 let dict_entry_size = 3 * std::mem::size_of::<ByteArray>() + 3 * 3; assert!(
293 size >= empty_size + dict_entry_size,
294 "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
295 );
296
297 let indices_size = 9 * std::mem::size_of::<u64>();
299 assert!(
300 size >= empty_size + dict_entry_size + indices_size,
301 "memory size {size} should include indices ({indices_size} bytes)"
302 );
303 }
304
305 #[test]
306 fn test_estimated_memory_size_byte_array_all_distinct() {
307 let mut encoder = DictEncoder::<ByteArrayType>::new(make_col_desc::<ByteArrayType>());
308 let empty_size = encoder.estimated_memory_size();
309
310 let values: Vec<ByteArray> = (0..100_u32)
312 .map(|i| ByteArray::from(i.to_string().into_bytes()))
313 .collect();
314 let bytes_total: usize = values.iter().map(|v| v.len()).sum(); encoder.put(&values).unwrap();
316
317 let size = encoder.estimated_memory_size();
318
319 let dict_entry_size = 100 * std::mem::size_of::<ByteArray>() + bytes_total;
321 assert!(
322 size >= empty_size + dict_entry_size,
323 "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
324 );
325
326 let indices_size = 100 * std::mem::size_of::<u64>();
328 assert!(
329 size >= empty_size + dict_entry_size + indices_size,
330 "memory size {size} should include indices ({indices_size} bytes)"
331 );
332 }
333
334 #[test]
335 fn test_estimated_memory_size_fixed_len_byte_array_with_duplicates() {
336 const TYPE_LEN: usize = 3;
337 let mut encoder = DictEncoder::<FixedLenByteArrayType>::new(make_col_desc_with_length::<
338 FixedLenByteArrayType,
339 >(TYPE_LEN as i32));
340 let empty_size = encoder.estimated_memory_size();
341
342 let vals = [
344 b"foo", b"bar", b"baz", b"foo", b"bar", b"baz", b"foo", b"bar", b"baz",
345 ]
346 .iter()
347 .map(|b| FixedLenByteArray::from(b.to_vec()))
348 .collect::<Vec<_>>();
349 encoder.put(&vals).unwrap();
350
351 let size = encoder.estimated_memory_size();
352
353 let dict_entry_size = 3 * std::mem::size_of::<FixedLenByteArray>() + 3 * TYPE_LEN;
356 assert!(
357 size >= empty_size + dict_entry_size,
358 "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
359 );
360
361 let indices_size = 9 * std::mem::size_of::<u64>();
363 assert!(
364 size >= empty_size + dict_entry_size + indices_size,
365 "memory size {size} should include indices ({indices_size} bytes)"
366 );
367 }
368
369 #[test]
370 fn test_estimated_memory_size_fixed_len_byte_array_all_distinct() {
371 const TYPE_LEN: usize = 3;
372 let mut encoder = DictEncoder::<FixedLenByteArrayType>::new(make_col_desc_with_length::<
373 FixedLenByteArrayType,
374 >(TYPE_LEN as i32));
375 let empty_size = encoder.estimated_memory_size();
376
377 let values = (0..100_u8)
379 .map(|i| FixedLenByteArray::from(vec![0u8, 0u8, i]))
380 .collect::<Vec<_>>();
381 encoder.put(&values).unwrap();
382
383 let size = encoder.estimated_memory_size();
384
385 let dict_entry_size = 100 * std::mem::size_of::<FixedLenByteArray>() + 100 * TYPE_LEN;
388 assert!(
389 size >= empty_size + dict_entry_size,
390 "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
391 );
392
393 let indices_size = 100 * std::mem::size_of::<u64>();
395 assert!(
396 size >= empty_size + dict_entry_size + indices_size,
397 "memory size {size} should include indices ({indices_size} bytes)"
398 );
399 }
400
401 #[test]
402 fn test_estimated_memory_size_includes_interner_dedup_table() {
403 let encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
407
408 let size = encoder.estimated_memory_size();
409
410 assert!(
411 size > 0,
412 "memory size should include the preallocated dedup hash table"
413 );
414 }
415
416 #[test]
417 fn test_estimated_memory_size_accounts_for_indices_capacity() {
418 let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
422
423 let big: Vec<i32> = vec![0; 64];
424 encoder.put(&big).unwrap();
425 let _ = encoder.flush_buffer().unwrap();
426
427 let flushed_size = encoder.estimated_memory_size();
428
429 encoder.put(&[0]).unwrap();
432
433 let size = encoder.estimated_memory_size();
434
435 assert_eq!(
436 size, flushed_size,
437 "memory size should include retained indices capacity",
438 );
439 }
440
441 #[test]
442 fn test_estimated_memory_size_accounts_for_uniques_capacity() {
443 let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
444
445 let values: Vec<i32> = (0..64).collect();
446 encoder.put(&values).unwrap();
447 let _ = encoder.flush_buffer().unwrap();
449
450 let size1 = encoder.estimated_memory_size();
451
452 let values: Vec<i32> = (64..128).collect();
455 encoder.put(&values).unwrap();
456 let _ = encoder.flush_buffer().unwrap();
458
459 let size2 = encoder.estimated_memory_size();
460
461 let min_uniques_bytes = 64 * std::mem::size_of::<i32>();
462 assert!(
463 size2 >= size1 + min_uniques_bytes,
464 "memory size {size2} should grow from {size1} by allocated uniques capacity \
465 (at least {min_uniques_bytes} bytes)"
466 );
467 }
468}