Skip to main content

parquet/encodings/encoding/
dict_encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// ----------------------------------------------------------------------
19// Dictionary encoding
20
21use bytes::Bytes;
22
23use crate::basic::{Encoding, Type};
24use crate::data_type::DataType;
25use crate::data_type::private::ParquetValueType;
26use crate::encodings::encoding::{Encoder, PlainEncoder};
27use crate::encodings::rle::RleEncoder;
28use crate::errors::Result;
29use crate::schema::types::ColumnDescPtr;
30use crate::util::bit_util::num_required_bits;
31use crate::util::interner::{Interner, Storage};
32
33#[derive(Debug)]
34struct KeyStorage<T: DataType> {
35    uniques: Vec<T::T>,
36
37    /// size of unique values (keys) in the dictionary, in bytes.
38    size_in_bytes: usize,
39
40    type_length: usize,
41}
42
43impl<T: DataType> Storage for KeyStorage<T> {
44    type Key = u64;
45    type Value = T::T;
46
47    fn get(&self, idx: Self::Key) -> &Self::Value {
48        &self.uniques[idx as usize]
49    }
50
51    fn push(&mut self, value: &Self::Value) -> Self::Key {
52        let (base_size, num_elements) = value.dict_encoding_size();
53
54        let unique_size = match T::get_physical_type() {
55            Type::BYTE_ARRAY => base_size + num_elements,
56            Type::FIXED_LEN_BYTE_ARRAY => self.type_length,
57            _ => base_size,
58        };
59        self.size_in_bytes += unique_size;
60
61        let key = self.uniques.len() as u64;
62        self.uniques.push(value.clone());
63        key
64    }
65
66    fn estimated_memory_size(&self) -> usize {
67        let uniques_heap_bytes = match T::get_physical_type() {
68            Type::FIXED_LEN_BYTE_ARRAY => self.type_length * self.uniques.len(),
69            _ => <Self::Value as ParquetValueType>::variable_length_bytes(&self.uniques)
70                .unwrap_or(0) as usize,
71        };
72        self.uniques.capacity() * std::mem::size_of::<T::T>() + uniques_heap_bytes
73    }
74}
75
76/// Dictionary encoder.
77/// The dictionary encoding builds a dictionary of values encountered in a given column.
78/// The dictionary page is written first, before the data pages of the column chunk.
79///
80/// Dictionary page format: the entries in the dictionary - in dictionary order -
81/// using the plain encoding.
82///
83/// Data page format: the bit width used to encode the entry ids stored as 1 byte
84/// (max bit width = 32), followed by the values encoded using RLE/Bit packed described
85/// above (with the given bit width).
86pub struct DictEncoder<T: DataType> {
87    interner: Interner<KeyStorage<T>>,
88
89    /// The buffered indices
90    indices: Vec<u64>,
91}
92
93impl<T: DataType> DictEncoder<T> {
94    /// Creates new dictionary encoder.
95    pub fn new(desc: ColumnDescPtr) -> Self {
96        let storage = KeyStorage {
97            uniques: vec![],
98            size_in_bytes: 0,
99            type_length: desc.type_length() as usize,
100        };
101
102        Self {
103            interner: Interner::new(storage),
104            indices: vec![],
105        }
106    }
107
108    /// Returns true if dictionary entries are sorted, false otherwise.
109    pub fn is_sorted(&self) -> bool {
110        // Sorting is not supported currently.
111        false
112    }
113
114    /// Returns number of unique values (keys) in the dictionary.
115    pub fn num_entries(&self) -> usize {
116        self.interner.storage().uniques.len()
117    }
118
119    /// Returns size of unique values (keys) in the dictionary, in bytes.
120    pub fn dict_encoded_size(&self) -> usize {
121        self.interner.storage().size_in_bytes
122    }
123
124    /// Writes out the dictionary values with PLAIN encoding in a byte buffer, and return
125    /// the result.
126    pub fn write_dict(&self) -> Result<Bytes> {
127        let mut plain_encoder = PlainEncoder::<T>::new();
128        plain_encoder.put(&self.interner.storage().uniques)?;
129        plain_encoder.flush_buffer()
130    }
131
132    /// Writes out the dictionary values with RLE encoding in a byte buffer, and return
133    /// the result.
134    pub fn write_indices(&mut self) -> Result<Bytes> {
135        let buffer_len = self.estimated_data_encoded_size();
136        let mut buffer = Vec::with_capacity(buffer_len);
137        buffer.push(self.bit_width());
138
139        // Write bit width in the first byte
140        let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
141        for index in &self.indices {
142            encoder.put(*index)
143        }
144        self.indices.clear();
145        Ok(encoder.consume().into())
146    }
147
148    fn put_one(&mut self, value: &T::T) {
149        self.indices.push(self.interner.intern(value));
150    }
151
152    #[inline]
153    fn bit_width(&self) -> u8 {
154        num_required_bits(self.num_entries().saturating_sub(1) as u64)
155    }
156}
157
158impl<T: DataType> Encoder<T> for DictEncoder<T> {
159    fn put(&mut self, values: &[T::T]) -> Result<()> {
160        self.indices.reserve(values.len());
161        for i in values {
162            self.put_one(i)
163        }
164        Ok(())
165    }
166
167    // Performance Note:
168    // As far as can be seen these functions are rarely called and as such we can hint to the
169    // compiler that they dont need to be folded into hot locations in the final output.
170    fn encoding(&self) -> Encoding {
171        Encoding::PLAIN_DICTIONARY
172    }
173
174    /// Returns an estimate of the data page size in bytes
175    ///
176    /// This includes:
177    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
178    fn estimated_data_encoded_size(&self) -> usize {
179        let bit_width = self.bit_width();
180        RleEncoder::max_buffer_size(bit_width, self.indices.len())
181    }
182
183    fn flush_buffer(&mut self) -> Result<Bytes> {
184        self.write_indices()
185    }
186
187    /// Returns the estimated total memory usage
188    ///
189    /// For this encoder, the indices are unencoded bytes (refer to [`Self::write_indices`]).
190    fn estimated_memory_size(&self) -> usize {
191        self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use std::sync::Arc;
198
199    use super::*;
200    use crate::data_type::{
201        ByteArray, ByteArrayType, FixedLenByteArray, FixedLenByteArrayType, Int32Type,
202    };
203    use crate::encodings::encoding::Encoder;
204    use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
205
206    fn make_col_desc<T: DataType>() -> ColumnDescPtr {
207        make_col_desc_with_length::<T>(-1)
208    }
209
210    fn make_col_desc_with_length<T: DataType>(type_length: i32) -> ColumnDescPtr {
211        let ty = SchemaType::primitive_type_builder("col", T::get_physical_type())
212            .with_length(type_length)
213            .build()
214            .unwrap();
215        Arc::new(ColumnDescriptor::new(
216            Arc::new(ty),
217            0,
218            0,
219            ColumnPath::new(vec![]),
220        ))
221    }
222
223    #[test]
224    fn test_estimated_memory_size_primitive_with_duplicates() {
225        let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
226        let empty_size = encoder.estimated_memory_size();
227
228        // 3 distinct values, repeated to produce 9 indices total.
229        encoder.put(&[1, 2, 3, 1, 2, 3, 1, 2, 3]).unwrap();
230
231        let size = encoder.estimated_memory_size();
232
233        // Must account for the 3 unique dictionary entries.
234        let dict_entry_size = 3 * std::mem::size_of::<i32>();
235        assert!(
236            size >= empty_size + dict_entry_size,
237            "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
238        );
239
240        // Must also account for the 9 buffered indices.
241        let indices_size = 9 * std::mem::size_of::<u64>();
242        assert!(
243            size >= empty_size + dict_entry_size + indices_size,
244            "memory size {size} should include indices ({indices_size} bytes)"
245        );
246    }
247
248    #[test]
249    fn test_estimated_memory_size_primitive_all_distinct() {
250        let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
251        let empty_size = encoder.estimated_memory_size();
252
253        let values: Vec<i32> = (0..100).collect();
254        encoder.put(&values).unwrap();
255
256        let size = encoder.estimated_memory_size();
257
258        // Must account for the 100 unique dictionary entries.
259        let dict_entry_size = 100 * std::mem::size_of::<i32>();
260        assert!(
261            size >= empty_size + dict_entry_size,
262            "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
263        );
264
265        // Must also account for the 100 buffered indices.
266        let indices_size = 100 * std::mem::size_of::<u64>();
267        assert!(
268            size >= empty_size + dict_entry_size + indices_size,
269            "memory size {size} should include indices ({indices_size} bytes)"
270        );
271    }
272
273    #[test]
274    fn test_estimated_memory_size_byte_array_with_duplicates() {
275        let mut encoder = DictEncoder::<ByteArrayType>::new(make_col_desc::<ByteArrayType>());
276        let empty_size = encoder.estimated_memory_size();
277
278        // 3 distinct byte strings ("foo", "bar", "baz" — 3 bytes each), repeated to produce
279        // 9 indices total.
280        let vals: Vec<ByteArray> = [
281            "foo", "bar", "baz", "foo", "bar", "baz", "foo", "bar", "baz",
282        ]
283        .iter()
284        .map(|s| ByteArray::from(*s))
285        .collect();
286        encoder.put(&vals).unwrap();
287
288        let size = encoder.estimated_memory_size();
289
290        // Must account for the 3 unique dictionary entries, including their heap-allocated bytes.
291        let dict_entry_size = 3 * std::mem::size_of::<ByteArray>() + 3 * 3; // 3 values × 3 bytes each
292        assert!(
293            size >= empty_size + dict_entry_size,
294            "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
295        );
296
297        // Must also account for the 9 buffered indices.
298        let indices_size = 9 * std::mem::size_of::<u64>();
299        assert!(
300            size >= empty_size + dict_entry_size + indices_size,
301            "memory size {size} should include indices ({indices_size} bytes)"
302        );
303    }
304
305    #[test]
306    fn test_estimated_memory_size_byte_array_all_distinct() {
307        let mut encoder = DictEncoder::<ByteArrayType>::new(make_col_desc::<ByteArrayType>());
308        let empty_size = encoder.estimated_memory_size();
309
310        // 100 distinct values: "0".."9" (1 byte each) and "10".."99" (2 bytes each).
311        let values: Vec<ByteArray> = (0..100_u32)
312            .map(|i| ByteArray::from(i.to_string().into_bytes()))
313            .collect();
314        let bytes_total: usize = values.iter().map(|v| v.len()).sum(); // 10×1 + 90×2 = 190
315        encoder.put(&values).unwrap();
316
317        let size = encoder.estimated_memory_size();
318
319        // Must account for the 100 unique dictionary entries, including their heap-allocated bytes.
320        let dict_entry_size = 100 * std::mem::size_of::<ByteArray>() + bytes_total;
321        assert!(
322            size >= empty_size + dict_entry_size,
323            "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
324        );
325
326        // Must also account for the 100 buffered indices.
327        let indices_size = 100 * std::mem::size_of::<u64>();
328        assert!(
329            size >= empty_size + dict_entry_size + indices_size,
330            "memory size {size} should include indices ({indices_size} bytes)"
331        );
332    }
333
334    #[test]
335    fn test_estimated_memory_size_fixed_len_byte_array_with_duplicates() {
336        const TYPE_LEN: usize = 3;
337        let mut encoder = DictEncoder::<FixedLenByteArrayType>::new(make_col_desc_with_length::<
338            FixedLenByteArrayType,
339        >(TYPE_LEN as i32));
340        let empty_size = encoder.estimated_memory_size();
341
342        // 3 distinct 3-byte values, repeated to produce 9 indices total.
343        let vals = [
344            b"foo", b"bar", b"baz", b"foo", b"bar", b"baz", b"foo", b"bar", b"baz",
345        ]
346        .iter()
347        .map(|b| FixedLenByteArray::from(b.to_vec()))
348        .collect::<Vec<_>>();
349        encoder.put(&vals).unwrap();
350
351        let size = encoder.estimated_memory_size();
352
353        // Must account for the 3 unique dictionary entries: struct overhead plus the
354        // fixed-length bytes allocated per entry.
355        let dict_entry_size = 3 * std::mem::size_of::<FixedLenByteArray>() + 3 * TYPE_LEN;
356        assert!(
357            size >= empty_size + dict_entry_size,
358            "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
359        );
360
361        // Must also account for the 9 buffered indices.
362        let indices_size = 9 * std::mem::size_of::<u64>();
363        assert!(
364            size >= empty_size + dict_entry_size + indices_size,
365            "memory size {size} should include indices ({indices_size} bytes)"
366        );
367    }
368
369    #[test]
370    fn test_estimated_memory_size_fixed_len_byte_array_all_distinct() {
371        const TYPE_LEN: usize = 3;
372        let mut encoder = DictEncoder::<FixedLenByteArrayType>::new(make_col_desc_with_length::<
373            FixedLenByteArrayType,
374        >(TYPE_LEN as i32));
375        let empty_size = encoder.estimated_memory_size();
376
377        // 100 distinct 3-byte values: zero-padded big-endian u8 indices.
378        let values = (0..100_u8)
379            .map(|i| FixedLenByteArray::from(vec![0u8, 0u8, i]))
380            .collect::<Vec<_>>();
381        encoder.put(&values).unwrap();
382
383        let size = encoder.estimated_memory_size();
384
385        // Must account for the 100 unique dictionary entries: struct overhead plus the
386        // fixed-length bytes allocated per entry.
387        let dict_entry_size = 100 * std::mem::size_of::<FixedLenByteArray>() + 100 * TYPE_LEN;
388        assert!(
389            size >= empty_size + dict_entry_size,
390            "memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
391        );
392
393        // Must also account for the 100 buffered indices.
394        let indices_size = 100 * std::mem::size_of::<u64>();
395        assert!(
396            size >= empty_size + dict_entry_size + indices_size,
397            "memory size {size} should include indices ({indices_size} bytes)"
398        );
399    }
400
401    #[test]
402    fn test_estimated_memory_size_includes_interner_dedup_table() {
403        // The dedup `HashTable` in `Interner` is preallocated with
404        // `DEFAULT_DEDUP_CAPACITY` slots at construction, independent of any
405        // values pushed.
406        let encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
407
408        let size = encoder.estimated_memory_size();
409
410        assert!(
411            size > 0,
412            "memory size should include the preallocated dedup hash table"
413        );
414    }
415
416    #[test]
417    fn test_estimated_memory_size_accounts_for_indices_capacity() {
418        // Exercises the `indices.capacity()` (not `.len()`) accounting.
419        // After a flush, `indices` is cleared but its capacity is retained; pushing a
420        // smaller batch afterwards leaves capacity strictly greater than length.
421        let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
422
423        let big: Vec<i32> = vec![0; 64];
424        encoder.put(&big).unwrap();
425        let _ = encoder.flush_buffer().unwrap();
426
427        let flushed_size = encoder.estimated_memory_size();
428
429        // Push a single value — indices.len() == 1 but indices.capacity() >= 64.
430        // No change on the key storage since the value is already interned.
431        encoder.put(&[0]).unwrap();
432
433        let size = encoder.estimated_memory_size();
434
435        assert_eq!(
436            size, flushed_size,
437            "memory size should include retained indices capacity",
438        );
439    }
440
441    #[test]
442    fn test_estimated_memory_size_accounts_for_uniques_capacity() {
443        let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
444
445        let values: Vec<i32> = (0..64).collect();
446        encoder.put(&values).unwrap();
447        // Flush indices so they don't mask the uniques accounting in the lower bound.
448        let _ = encoder.flush_buffer().unwrap();
449
450        let size1 = encoder.estimated_memory_size();
451
452        // Push more values to trigger uniques capacity growth.
453        // The pre-allocated dedup hash table is unlikely to be resized.
454        let values: Vec<i32> = (64..128).collect();
455        encoder.put(&values).unwrap();
456        // Flush indices so they don't mask the uniques accounting in the lower bound.
457        let _ = encoder.flush_buffer().unwrap();
458
459        let size2 = encoder.estimated_memory_size();
460
461        let min_uniques_bytes = 64 * std::mem::size_of::<i32>();
462        assert!(
463            size2 >= size1 + min_uniques_bytes,
464            "memory size {size2} should grow from {size1} by allocated uniques capacity \
465             (at least {min_uniques_bytes} bytes)"
466        );
467    }
468}