parquet/encodings/encoding/
dict_encoder.rs1use bytes::Bytes;
22
23use crate::basic::{Encoding, Type};
24use crate::data_type::private::ParquetValueType;
25use crate::data_type::DataType;
26use crate::encodings::encoding::{Encoder, PlainEncoder};
27use crate::encodings::rle::RleEncoder;
28use crate::errors::Result;
29use crate::schema::types::ColumnDescPtr;
30use crate::util::bit_util::num_required_bits;
31use crate::util::interner::{Interner, Storage};
32
33#[derive(Debug)]
34struct KeyStorage<T: DataType> {
35 uniques: Vec<T::T>,
36
37 size_in_bytes: usize,
39
40 type_length: usize,
41}
42
43impl<T: DataType> Storage for KeyStorage<T> {
44 type Key = u64;
45 type Value = T::T;
46
47 fn get(&self, idx: Self::Key) -> &Self::Value {
48 &self.uniques[idx as usize]
49 }
50
51 fn push(&mut self, value: &Self::Value) -> Self::Key {
52 let (base_size, num_elements) = value.dict_encoding_size();
53
54 let unique_size = match T::get_physical_type() {
55 Type::BYTE_ARRAY => base_size + num_elements,
56 Type::FIXED_LEN_BYTE_ARRAY => self.type_length,
57 _ => base_size,
58 };
59 self.size_in_bytes += unique_size;
60
61 let key = self.uniques.len() as u64;
62 self.uniques.push(value.clone());
63 key
64 }
65
66 fn estimated_memory_size(&self) -> usize {
67 self.size_in_bytes + self.uniques.capacity() * std::mem::size_of::<T::T>()
68 }
69}
70
71pub struct DictEncoder<T: DataType> {
82 interner: Interner<KeyStorage<T>>,
83
84 indices: Vec<u64>,
86}
87
88impl<T: DataType> DictEncoder<T> {
89 pub fn new(desc: ColumnDescPtr) -> Self {
91 let storage = KeyStorage {
92 uniques: vec![],
93 size_in_bytes: 0,
94 type_length: desc.type_length() as usize,
95 };
96
97 Self {
98 interner: Interner::new(storage),
99 indices: vec![],
100 }
101 }
102
103 pub fn is_sorted(&self) -> bool {
105 false
107 }
108
109 pub fn num_entries(&self) -> usize {
111 self.interner.storage().uniques.len()
112 }
113
114 pub fn dict_encoded_size(&self) -> usize {
116 self.interner.storage().size_in_bytes
117 }
118
119 pub fn write_dict(&self) -> Result<Bytes> {
122 let mut plain_encoder = PlainEncoder::<T>::new();
123 plain_encoder.put(&self.interner.storage().uniques)?;
124 plain_encoder.flush_buffer()
125 }
126
127 pub fn write_indices(&mut self) -> Result<Bytes> {
130 let buffer_len = self.estimated_data_encoded_size();
131 let mut buffer = Vec::with_capacity(buffer_len);
132 buffer.push(self.bit_width());
133
134 let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
136 for index in &self.indices {
137 encoder.put(*index)
138 }
139 self.indices.clear();
140 Ok(encoder.consume().into())
141 }
142
143 fn put_one(&mut self, value: &T::T) {
144 self.indices.push(self.interner.intern(value));
145 }
146
147 #[inline]
148 fn bit_width(&self) -> u8 {
149 num_required_bits(self.num_entries().saturating_sub(1) as u64)
150 }
151}
152
153impl<T: DataType> Encoder<T> for DictEncoder<T> {
154 fn put(&mut self, values: &[T::T]) -> Result<()> {
155 self.indices.reserve(values.len());
156 for i in values {
157 self.put_one(i)
158 }
159 Ok(())
160 }
161
162 fn encoding(&self) -> Encoding {
166 Encoding::PLAIN_DICTIONARY
167 }
168
169 fn estimated_data_encoded_size(&self) -> usize {
174 let bit_width = self.bit_width();
175 RleEncoder::max_buffer_size(bit_width, self.indices.len())
176 }
177
178 fn flush_buffer(&mut self) -> Result<Bytes> {
179 self.write_indices()
180 }
181
182 fn estimated_memory_size(&self) -> usize {
186 self.interner.storage().size_in_bytes + self.indices.len() * std::mem::size_of::<usize>()
187 }
188}