1use crate::arrow::buffer::offset_buffer::OffsetBuffer;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use arrow_array::{Array, GenericByteArray, downcast_integer};
22use arrow_array::{
23 ArrayRef, FixedSizeBinaryArray, OffsetSizeTrait,
24 builder::{FixedSizeBinaryDictionaryBuilder, GenericByteDictionaryBuilder},
25 cast::AsArray,
26 make_array,
27 types::{ArrowDictionaryKeyType, ByteArrayType},
28};
29use arrow_buffer::{ArrowNativeType, Buffer};
30use arrow_data::ArrayDataBuilder;
31use arrow_schema::DataType as ArrowType;
32use std::sync::Arc;
33
34pub enum DictionaryBuffer<K: ArrowNativeType, V: OffsetSizeTrait> {
37 Dict { keys: Vec<K>, values: ArrayRef },
38 Values { values: OffsetBuffer<V> },
39}
40
41impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
42 #[allow(unused)]
43 pub fn len(&self) -> usize {
44 match self {
45 Self::Dict { keys, .. } => keys.len(),
46 Self::Values { values } => values.len(),
47 }
48 }
49
50 pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut Vec<K>> {
58 assert!(K::from_usize(dictionary.len()).is_some());
59
60 match self {
61 Self::Dict { keys, values } => {
62 let values_ptr = values.as_ref() as *const _ as *const ();
66 let dict_ptr = dictionary.as_ref() as *const _ as *const ();
67 if values_ptr == dict_ptr {
68 Some(keys)
69 } else if keys.is_empty() {
70 *values = Arc::clone(dictionary);
71 Some(keys)
72 } else {
73 None
74 }
75 }
76 Self::Values { values } if values.is_empty() => {
77 *self = Self::Dict {
78 keys: Default::default(),
79 values: Arc::clone(dictionary),
80 };
81 match self {
82 Self::Dict { keys, .. } => Some(keys),
83 _ => unreachable!(),
84 }
85 }
86 _ => None,
87 }
88 }
89
90 pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
95 match self {
96 Self::Values { values } => Ok(values),
97 Self::Dict { keys, values } => {
98 let mut spilled = OffsetBuffer::with_capacity(0);
99 let data = values.to_data();
100 let dict_buffers = data.buffers();
101 let dict_offsets = dict_buffers[0].typed_data::<V>();
102 let dict_values = dict_buffers[1].as_slice();
103
104 if values.is_empty() {
105 spilled.offsets.resize(keys.len() + 1, V::default());
107 } else {
108 spilled.extend_from_dictionary(keys.as_slice(), dict_offsets, dict_values)?;
114 }
115
116 *self = Self::Values { values: spilled };
117 match self {
118 Self::Values { values } => Ok(values),
119 _ => unreachable!(),
120 }
121 }
122 }
123 }
124
125 pub fn into_array(
127 self,
128 null_buffer: Option<Buffer>,
129 data_type: &ArrowType,
130 ) -> Result<ArrayRef> {
131 assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
132
133 match self {
134 Self::Dict { keys, values } => {
135 if !values.is_empty() {
137 let min = K::from_usize(0).unwrap();
138 let max = K::from_usize(values.len()).unwrap();
139
140 if !keys
144 .as_slice()
145 .iter()
146 .copied()
147 .fold(true, |a, x| a && x >= min && x < max)
148 {
149 return Err(general_err!(
150 "dictionary key beyond bounds of dictionary: 0..{}",
151 values.len()
152 ));
153 }
154 }
155
156 let ArrowType::Dictionary(_, value_type) = data_type else {
157 unreachable!()
158 };
159 let values = if let ArrowType::FixedSizeBinary(size) = **value_type {
160 let binary = values.as_binary::<i32>();
161 Arc::new(FixedSizeBinaryArray::new(
162 size,
163 binary.values().clone(),
164 binary.nulls().cloned(),
165 )) as _
166 } else {
167 values
168 };
169
170 let builder = ArrayDataBuilder::new(data_type.clone())
171 .len(keys.len())
172 .add_buffer(Buffer::from_vec(keys))
173 .add_child_data(values.into_data())
174 .null_bit_buffer(null_buffer);
175
176 let data = match cfg!(debug_assertions) {
177 true => builder.build().unwrap(),
178 false => unsafe { builder.build_unchecked() },
179 };
180
181 Ok(make_array(data))
182 }
183 Self::Values { values } => {
184 let (key_type, value_type) = match data_type {
185 ArrowType::Dictionary(k, v) => (k, v.as_ref().clone()),
186 _ => unreachable!(),
187 };
188
189 let array = values.into_array(null_buffer, value_type);
190 pack_values(key_type, &array)
191 }
192 }
193 }
194}
195
196impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K, V> {
197 fn with_capacity(capacity: usize) -> Self {
198 Self::Values {
199 values: OffsetBuffer::with_capacity(capacity),
200 }
201 }
202
203 fn pad_nulls(
204 &mut self,
205 read_offset: usize,
206 values_read: usize,
207 levels_read: usize,
208 valid_mask: &[u8],
209 ) {
210 match self {
211 Self::Dict { keys, .. } => {
212 keys.resize(read_offset + levels_read, K::default());
213 keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
214 }
215 Self::Values { values, .. } => {
216 values.pad_nulls(read_offset, values_read, levels_read, valid_mask)
217 }
218 }
219 }
220}
221
222macro_rules! dict_helper {
223 ($k:ty, $array:ident) => {
224 match $array.data_type() {
225 ArrowType::Utf8 => pack_values_impl::<$k, _>($array.as_string::<i32>()),
226 ArrowType::LargeUtf8 => pack_values_impl::<$k, _>($array.as_string::<i64>()),
227 ArrowType::Binary => pack_values_impl::<$k, _>($array.as_binary::<i32>()),
228 ArrowType::LargeBinary => pack_values_impl::<$k, _>($array.as_binary::<i64>()),
229 ArrowType::FixedSizeBinary(_) => {
230 pack_fixed_values_impl::<$k>($array.as_fixed_size_binary())
231 }
232 _ => unreachable!(),
233 }
234 };
235}
236
237fn pack_values(key_type: &ArrowType, values: &ArrayRef) -> Result<ArrayRef> {
238 downcast_integer! {
239 key_type => (dict_helper, values),
240 _ => unreachable!(),
241 }
242}
243
244fn pack_values_impl<K: ArrowDictionaryKeyType, T: ByteArrayType>(
245 array: &GenericByteArray<T>,
246) -> Result<ArrayRef> {
247 let mut builder = GenericByteDictionaryBuilder::<K, T>::with_capacity(array.len(), 1024, 1024);
248 for x in array {
249 match x {
250 Some(x) => builder.append_value(x),
251 None => builder.append_null(),
252 }
253 }
254 let raw = builder.finish();
255 Ok(Arc::new(raw))
256}
257
258fn pack_fixed_values_impl<K: ArrowDictionaryKeyType>(
259 array: &FixedSizeBinaryArray,
260) -> Result<ArrayRef> {
261 let mut builder = FixedSizeBinaryDictionaryBuilder::<K>::with_capacity(
262 array.len(),
263 1024,
264 array.value_length(),
265 );
266 for x in array {
267 match x {
268 Some(x) => builder.append_value(x),
269 None => builder.append_null(),
270 }
271 }
272 let raw = builder.finish();
273 Ok(Arc::new(raw))
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use arrow::compute::cast;
280 use arrow_array::StringArray;
281
282 #[test]
283 fn test_dictionary_buffer() {
284 let dict_type =
285 ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
286
287 let d1: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world", "", "a", "b"]));
288
289 let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
290
291 let values = &[1, 0, 3, 2, 4];
293 buffer.as_keys(&d1).unwrap().extend_from_slice(values);
294
295 let mut valid = vec![false, false, true, true, false, true, true, true];
296 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
297 buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
298
299 let values = buffer.spill_values().unwrap();
302 let read_offset = values.len();
303 values.try_push("bingo".as_bytes(), false).unwrap();
304 values.try_push("bongo".as_bytes(), false).unwrap();
305
306 valid.extend_from_slice(&[false, false, true, false, true]);
307 let null_buffer = Buffer::from_iter(valid.iter().cloned());
308 buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
309
310 assert_eq!(buffer.len(), 13);
311 let split = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0));
312
313 let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
314 assert_eq!(array.data_type(), &dict_type);
315
316 let strings = cast(&array, &ArrowType::Utf8).unwrap();
317 let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
318 assert_eq!(
319 strings.iter().collect::<Vec<_>>(),
320 vec![
321 None,
322 None,
323 Some("world"),
324 Some("hello"),
325 None,
326 Some("a"),
327 Some(""),
328 Some("b"),
329 None,
330 None,
331 Some("bingo"),
332 None,
333 Some("bongo")
334 ]
335 );
336
337 assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
339 assert_eq!(buffer.len(), 0);
340 let d2 = Arc::new(StringArray::from(vec!["bingo", ""])) as ArrayRef;
341 buffer
342 .as_keys(&d2)
343 .unwrap()
344 .extend_from_slice(&[0, 1, 0, 1]);
345
346 let array = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0))
347 .into_array(None, &dict_type)
348 .unwrap();
349 assert_eq!(array.data_type(), &dict_type);
350
351 let strings = cast(&array, &ArrowType::Utf8).unwrap();
352 let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
353 assert_eq!(
354 strings.iter().collect::<Vec<_>>(),
355 vec![Some("bingo"), Some(""), Some("bingo"), Some("")]
356 );
357
358 assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
360 assert_eq!(buffer.len(), 0);
361 let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef;
362 buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
363
364 let d4 = Arc::new(StringArray::from(vec!["bananas"])) as ArrayRef;
366 assert!(buffer.as_keys(&d4).is_none());
367 }
368
369 #[test]
370 fn test_validates_keys() {
371 let dict_type =
372 ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
373
374 let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
375 let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef;
376 buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
377
378 let err = buffer.into_array(None, &dict_type).unwrap_err().to_string();
379 assert!(
380 err.contains("dictionary key beyond bounds of dictionary: 0..2"),
381 "{}",
382 err
383 );
384
385 let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
386 let d = Arc::new(StringArray::from(vec![""])) as ArrayRef;
387 buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]);
388
389 let err = buffer.spill_values().unwrap_err().to_string();
390 assert!(
391 err.contains("dictionary key beyond bounds of dictionary: 0..1"),
392 "{}",
393 err
394 );
395 }
396}