1use crate::arrow::buffer::offset_buffer::OffsetBuffer;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use arrow_array::{Array, GenericByteArray, downcast_integer};
22use arrow_array::{
23 ArrayRef, FixedSizeBinaryArray, OffsetSizeTrait,
24 builder::{FixedSizeBinaryDictionaryBuilder, GenericByteDictionaryBuilder},
25 cast::AsArray,
26 make_array,
27 types::{ArrowDictionaryKeyType, ByteArrayType},
28};
29use arrow_buffer::{ArrowNativeType, Buffer};
30use arrow_data::ArrayDataBuilder;
31use arrow_schema::DataType as ArrowType;
32use std::sync::Arc;
33
34pub enum DictionaryBuffer<K: ArrowNativeType, V: OffsetSizeTrait> {
37 Dict { keys: Vec<K>, values: ArrayRef },
38 Values { values: OffsetBuffer<V> },
39}
40
41impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
42 #[allow(unused)]
43 pub fn len(&self) -> usize {
44 match self {
45 Self::Dict { keys, .. } => keys.len(),
46 Self::Values { values } => values.len(),
47 }
48 }
49
50 pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut Vec<K>> {
58 assert!(K::from_usize(dictionary.len()).is_some());
59
60 match self {
61 Self::Dict { keys, values } => {
62 let values_ptr = values.as_ref() as *const _ as *const ();
66 let dict_ptr = dictionary.as_ref() as *const _ as *const ();
67 if values_ptr == dict_ptr {
68 Some(keys)
69 } else if keys.is_empty() {
70 *values = Arc::clone(dictionary);
71 Some(keys)
72 } else {
73 None
74 }
75 }
76 Self::Values { values } if values.is_empty() => {
77 *self = Self::Dict {
78 keys: Default::default(),
79 values: Arc::clone(dictionary),
80 };
81 match self {
82 Self::Dict { keys, .. } => Some(keys),
83 _ => unreachable!(),
84 }
85 }
86 _ => None,
87 }
88 }
89
90 pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
95 match self {
96 Self::Values { values } => Ok(values),
97 Self::Dict { keys, values } => {
98 let mut spilled = OffsetBuffer::with_capacity(0);
99 let data = values.to_data();
100 let dict_buffers = data.buffers();
101 let dict_offsets = dict_buffers[0].typed_data::<V>();
102 let dict_values = dict_buffers[1].as_slice();
103
104 if values.is_empty() {
105 spilled.offsets.resize(keys.len() + 1, V::default());
107 } else {
108 spilled.extend_from_dictionary(keys.as_slice(), dict_offsets, dict_values)?;
114 }
115
116 *self = Self::Values { values: spilled };
117 match self {
118 Self::Values { values } => Ok(values),
119 _ => unreachable!(),
120 }
121 }
122 }
123 }
124
125 pub fn into_array(
127 self,
128 null_buffer: Option<Buffer>,
129 data_type: &ArrowType,
130 ) -> Result<ArrayRef> {
131 assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
132
133 match self {
134 Self::Dict { keys, values } => {
135 if !values.is_empty() {
137 let min = K::from_usize(0).unwrap();
138 let max = K::from_usize(values.len()).unwrap();
139
140 if !keys
144 .as_slice()
145 .iter()
146 .copied()
147 .fold(true, |a, x| a && x >= min && x < max)
148 {
149 return Err(general_err!(
150 "dictionary key beyond bounds of dictionary: 0..{}",
151 values.len()
152 ));
153 }
154 }
155
156 let ArrowType::Dictionary(_, value_type) = data_type else {
157 unreachable!()
158 };
159 let values = if let ArrowType::FixedSizeBinary(size) = **value_type {
160 let binary = values.as_binary::<i32>();
161 Arc::new(FixedSizeBinaryArray::new(
162 size,
163 binary.values().clone(),
164 binary.nulls().cloned(),
165 )) as _
166 } else {
167 values
168 };
169
170 let builder = ArrayDataBuilder::new(data_type.clone())
171 .len(keys.len())
172 .add_buffer(Buffer::from_vec(keys))
173 .add_child_data(values.into_data())
174 .null_bit_buffer(null_buffer);
175
176 let data = match cfg!(debug_assertions) {
177 true => builder.build().unwrap(),
178 false => unsafe { builder.build_unchecked() },
179 };
180
181 Ok(make_array(data))
182 }
183 Self::Values { values } => {
184 let (key_type, value_type) = match data_type {
185 ArrowType::Dictionary(k, v) => (k, v.as_ref().clone()),
186 _ => unreachable!(),
187 };
188
189 let array = values.into_array(null_buffer, value_type);
190 pack_values(key_type, &array)
191 }
192 }
193 }
194}
195
196impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K, V> {
197 fn with_capacity(capacity: usize) -> Self {
198 Self::Values {
199 values: OffsetBuffer::with_capacity(capacity),
200 }
201 }
202
203 fn reserve_exact(&mut self, additional: usize) {
204 match self {
205 Self::Dict { keys, .. } => keys.reserve_exact(additional),
206 Self::Values { values, .. } => values.reserve_exact(additional),
207 }
208 }
209
210 fn pad_nulls(
211 &mut self,
212 read_offset: usize,
213 values_read: usize,
214 levels_read: usize,
215 valid_mask: &[u8],
216 ) -> Result<()> {
217 match self {
218 Self::Dict { keys, .. } => {
219 keys.resize(read_offset + levels_read, K::default());
220 keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
221 }
222 Self::Values { values, .. } => {
223 values.pad_nulls(read_offset, values_read, levels_read, valid_mask)
224 }
225 }
226 }
227}
228
229macro_rules! dict_helper {
230 ($k:ty, $array:ident) => {
231 match $array.data_type() {
232 ArrowType::Utf8 => pack_values_impl::<$k, _>($array.as_string::<i32>()),
233 ArrowType::LargeUtf8 => pack_values_impl::<$k, _>($array.as_string::<i64>()),
234 ArrowType::Binary => pack_values_impl::<$k, _>($array.as_binary::<i32>()),
235 ArrowType::LargeBinary => pack_values_impl::<$k, _>($array.as_binary::<i64>()),
236 ArrowType::FixedSizeBinary(_) => {
237 pack_fixed_values_impl::<$k>($array.as_fixed_size_binary())
238 }
239 _ => unreachable!(),
240 }
241 };
242}
243
244fn pack_values(key_type: &ArrowType, values: &ArrayRef) -> Result<ArrayRef> {
245 downcast_integer! {
246 key_type => (dict_helper, values),
247 _ => unreachable!(),
248 }
249}
250
251fn pack_values_impl<K: ArrowDictionaryKeyType, T: ByteArrayType>(
252 array: &GenericByteArray<T>,
253) -> Result<ArrayRef> {
254 let mut builder = GenericByteDictionaryBuilder::<K, T>::with_capacity(array.len(), 1024, 1024);
255 for x in array {
256 match x {
257 Some(x) => builder.append_value(x),
258 None => builder.append_null(),
259 }
260 }
261 let raw = builder.finish();
262 Ok(Arc::new(raw))
263}
264
265fn pack_fixed_values_impl<K: ArrowDictionaryKeyType>(
266 array: &FixedSizeBinaryArray,
267) -> Result<ArrayRef> {
268 let mut builder = FixedSizeBinaryDictionaryBuilder::<K>::with_capacity(
269 array.len(),
270 1024,
271 array.value_length(),
272 );
273 for x in array {
274 match x {
275 Some(x) => builder.append_value(x),
276 None => builder.append_null(),
277 }
278 }
279 let raw = builder.finish();
280 Ok(Arc::new(raw))
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use arrow::compute::cast;
287 use arrow_array::StringArray;
288
289 #[test]
290 fn test_dictionary_buffer() {
291 let dict_type =
292 ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
293
294 let d1: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world", "", "a", "b"]));
295
296 let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
297
298 let values = &[1, 0, 3, 2, 4];
300 buffer.as_keys(&d1).unwrap().extend_from_slice(values);
301
302 let mut valid = vec![false, false, true, true, false, true, true, true];
303 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
304 buffer
305 .pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice())
306 .unwrap();
307
308 let values = buffer.spill_values().unwrap();
311 let read_offset = values.len();
312 values.try_push("bingo".as_bytes(), false).unwrap();
313 values.try_push("bongo".as_bytes(), false).unwrap();
314
315 valid.extend_from_slice(&[false, false, true, false, true]);
316 let null_buffer = Buffer::from_iter(valid.iter().cloned());
317 buffer
318 .pad_nulls(read_offset, 2, 5, null_buffer.as_slice())
319 .unwrap();
320
321 assert_eq!(buffer.len(), 13);
322 let split = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0));
323
324 let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
325 assert_eq!(array.data_type(), &dict_type);
326
327 let strings = cast(&array, &ArrowType::Utf8).unwrap();
328 let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
329 assert_eq!(
330 strings.iter().collect::<Vec<_>>(),
331 vec![
332 None,
333 None,
334 Some("world"),
335 Some("hello"),
336 None,
337 Some("a"),
338 Some(""),
339 Some("b"),
340 None,
341 None,
342 Some("bingo"),
343 None,
344 Some("bongo")
345 ]
346 );
347
348 assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
350 assert_eq!(buffer.len(), 0);
351 let d2 = Arc::new(StringArray::from(vec!["bingo", ""])) as ArrayRef;
352 buffer
353 .as_keys(&d2)
354 .unwrap()
355 .extend_from_slice(&[0, 1, 0, 1]);
356
357 let array = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0))
358 .into_array(None, &dict_type)
359 .unwrap();
360 assert_eq!(array.data_type(), &dict_type);
361
362 let strings = cast(&array, &ArrowType::Utf8).unwrap();
363 let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
364 assert_eq!(
365 strings.iter().collect::<Vec<_>>(),
366 vec![Some("bingo"), Some(""), Some("bingo"), Some("")]
367 );
368
369 assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
371 assert_eq!(buffer.len(), 0);
372 let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef;
373 buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
374
375 let d4 = Arc::new(StringArray::from(vec!["bananas"])) as ArrayRef;
377 assert!(buffer.as_keys(&d4).is_none());
378 }
379
380 #[test]
381 fn test_validates_keys() {
382 let dict_type =
383 ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
384
385 let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
386 let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef;
387 buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
388
389 let err = buffer.into_array(None, &dict_type).unwrap_err().to_string();
390 assert!(
391 err.contains("dictionary key beyond bounds of dictionary: 0..2"),
392 "{}",
393 err
394 );
395
396 let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
397 let d = Arc::new(StringArray::from(vec![""])) as ArrayRef;
398 buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]);
399
400 let err = buffer.spill_values().unwrap_err().to_string();
401 assert!(
402 err.contains("dictionary key beyond bounds of dictionary: 0..1"),
403 "{}",
404 err
405 );
406 }
407}