1use crate::arrow::buffer::offset_buffer::OffsetBuffer;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use arrow_array::{Array, GenericByteArray, downcast_integer};
22use arrow_array::{
23 ArrayRef, FixedSizeBinaryArray, OffsetSizeTrait,
24 builder::{FixedSizeBinaryDictionaryBuilder, GenericByteDictionaryBuilder},
25 cast::AsArray,
26 make_array,
27 types::{ArrowDictionaryKeyType, ByteArrayType},
28};
29use arrow_buffer::{ArrowNativeType, Buffer};
30use arrow_data::ArrayDataBuilder;
31use arrow_schema::DataType as ArrowType;
32use std::sync::Arc;
33
34pub enum DictionaryBuffer<K: ArrowNativeType, V: OffsetSizeTrait> {
37 Dict { keys: Vec<K>, values: ArrayRef },
38 Values { values: OffsetBuffer<V> },
39}
40
41impl<K: ArrowNativeType, V: OffsetSizeTrait> Default for DictionaryBuffer<K, V> {
42 fn default() -> Self {
43 Self::Values {
44 values: Default::default(),
45 }
46 }
47}
48
49impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
50 #[allow(unused)]
51 pub fn len(&self) -> usize {
52 match self {
53 Self::Dict { keys, .. } => keys.len(),
54 Self::Values { values } => values.len(),
55 }
56 }
57
58 pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut Vec<K>> {
66 assert!(K::from_usize(dictionary.len()).is_some());
67
68 match self {
69 Self::Dict { keys, values } => {
70 let values_ptr = values.as_ref() as *const _ as *const ();
74 let dict_ptr = dictionary.as_ref() as *const _ as *const ();
75 if values_ptr == dict_ptr {
76 Some(keys)
77 } else if keys.is_empty() {
78 *values = Arc::clone(dictionary);
79 Some(keys)
80 } else {
81 None
82 }
83 }
84 Self::Values { values } if values.is_empty() => {
85 *self = Self::Dict {
86 keys: Default::default(),
87 values: Arc::clone(dictionary),
88 };
89 match self {
90 Self::Dict { keys, .. } => Some(keys),
91 _ => unreachable!(),
92 }
93 }
94 _ => None,
95 }
96 }
97
98 pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
103 match self {
104 Self::Values { values } => Ok(values),
105 Self::Dict { keys, values } => {
106 let mut spilled = OffsetBuffer::default();
107 let data = values.to_data();
108 let dict_buffers = data.buffers();
109 let dict_offsets = dict_buffers[0].typed_data::<V>();
110 let dict_values = dict_buffers[1].as_slice();
111
112 if values.is_empty() {
113 spilled.offsets.resize(keys.len() + 1, V::default());
115 } else {
116 spilled.extend_from_dictionary(keys.as_slice(), dict_offsets, dict_values)?;
122 }
123
124 *self = Self::Values { values: spilled };
125 match self {
126 Self::Values { values } => Ok(values),
127 _ => unreachable!(),
128 }
129 }
130 }
131 }
132
133 pub fn into_array(
135 self,
136 null_buffer: Option<Buffer>,
137 data_type: &ArrowType,
138 ) -> Result<ArrayRef> {
139 assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
140
141 match self {
142 Self::Dict { keys, values } => {
143 if !values.is_empty() {
145 let min = K::from_usize(0).unwrap();
146 let max = K::from_usize(values.len()).unwrap();
147
148 if !keys
152 .as_slice()
153 .iter()
154 .copied()
155 .fold(true, |a, x| a && x >= min && x < max)
156 {
157 return Err(general_err!(
158 "dictionary key beyond bounds of dictionary: 0..{}",
159 values.len()
160 ));
161 }
162 }
163
164 let ArrowType::Dictionary(_, value_type) = data_type else {
165 unreachable!()
166 };
167 let values = if let ArrowType::FixedSizeBinary(size) = **value_type {
168 let binary = values.as_binary::<i32>();
169 Arc::new(FixedSizeBinaryArray::new(
170 size,
171 binary.values().clone(),
172 binary.nulls().cloned(),
173 )) as _
174 } else {
175 values
176 };
177
178 let builder = ArrayDataBuilder::new(data_type.clone())
179 .len(keys.len())
180 .add_buffer(Buffer::from_vec(keys))
181 .add_child_data(values.into_data())
182 .null_bit_buffer(null_buffer);
183
184 let data = match cfg!(debug_assertions) {
185 true => builder.build().unwrap(),
186 false => unsafe { builder.build_unchecked() },
187 };
188
189 Ok(make_array(data))
190 }
191 Self::Values { values } => {
192 let (key_type, value_type) = match data_type {
193 ArrowType::Dictionary(k, v) => (k, v.as_ref().clone()),
194 _ => unreachable!(),
195 };
196
197 let array = values.into_array(null_buffer, value_type);
198 pack_values(key_type, &array)
199 }
200 }
201 }
202}
203
204impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K, V> {
205 fn pad_nulls(
206 &mut self,
207 read_offset: usize,
208 values_read: usize,
209 levels_read: usize,
210 valid_mask: &[u8],
211 ) {
212 match self {
213 Self::Dict { keys, .. } => {
214 keys.resize(read_offset + levels_read, K::default());
215 keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
216 }
217 Self::Values { values, .. } => {
218 values.pad_nulls(read_offset, values_read, levels_read, valid_mask)
219 }
220 }
221 }
222}
223
224macro_rules! dict_helper {
225 ($k:ty, $array:ident) => {
226 match $array.data_type() {
227 ArrowType::Utf8 => pack_values_impl::<$k, _>($array.as_string::<i32>()),
228 ArrowType::LargeUtf8 => pack_values_impl::<$k, _>($array.as_string::<i64>()),
229 ArrowType::Binary => pack_values_impl::<$k, _>($array.as_binary::<i32>()),
230 ArrowType::LargeBinary => pack_values_impl::<$k, _>($array.as_binary::<i64>()),
231 ArrowType::FixedSizeBinary(_) => {
232 pack_fixed_values_impl::<$k>($array.as_fixed_size_binary())
233 }
234 _ => unreachable!(),
235 }
236 };
237}
238
239fn pack_values(key_type: &ArrowType, values: &ArrayRef) -> Result<ArrayRef> {
240 downcast_integer! {
241 key_type => (dict_helper, values),
242 _ => unreachable!(),
243 }
244}
245
246fn pack_values_impl<K: ArrowDictionaryKeyType, T: ByteArrayType>(
247 array: &GenericByteArray<T>,
248) -> Result<ArrayRef> {
249 let mut builder = GenericByteDictionaryBuilder::<K, T>::with_capacity(array.len(), 1024, 1024);
250 for x in array {
251 match x {
252 Some(x) => builder.append_value(x),
253 None => builder.append_null(),
254 }
255 }
256 let raw = builder.finish();
257 Ok(Arc::new(raw))
258}
259
260fn pack_fixed_values_impl<K: ArrowDictionaryKeyType>(
261 array: &FixedSizeBinaryArray,
262) -> Result<ArrayRef> {
263 let mut builder = FixedSizeBinaryDictionaryBuilder::<K>::with_capacity(
264 array.len(),
265 1024,
266 array.value_length(),
267 );
268 for x in array {
269 match x {
270 Some(x) => builder.append_value(x),
271 None => builder.append_null(),
272 }
273 }
274 let raw = builder.finish();
275 Ok(Arc::new(raw))
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use arrow::compute::cast;
282 use arrow_array::StringArray;
283
284 #[test]
285 fn test_dictionary_buffer() {
286 let dict_type =
287 ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
288
289 let d1: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world", "", "a", "b"]));
290
291 let mut buffer = DictionaryBuffer::<i32, i32>::default();
292
293 let values = &[1, 0, 3, 2, 4];
295 buffer.as_keys(&d1).unwrap().extend_from_slice(values);
296
297 let mut valid = vec![false, false, true, true, false, true, true, true];
298 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
299 buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
300
301 let values = buffer.spill_values().unwrap();
304 let read_offset = values.len();
305 values.try_push("bingo".as_bytes(), false).unwrap();
306 values.try_push("bongo".as_bytes(), false).unwrap();
307
308 valid.extend_from_slice(&[false, false, true, false, true]);
309 let null_buffer = Buffer::from_iter(valid.iter().cloned());
310 buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
311
312 assert_eq!(buffer.len(), 13);
313 let split = std::mem::take(&mut buffer);
314
315 let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
316 assert_eq!(array.data_type(), &dict_type);
317
318 let strings = cast(&array, &ArrowType::Utf8).unwrap();
319 let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
320 assert_eq!(
321 strings.iter().collect::<Vec<_>>(),
322 vec![
323 None,
324 None,
325 Some("world"),
326 Some("hello"),
327 None,
328 Some("a"),
329 Some(""),
330 Some("b"),
331 None,
332 None,
333 Some("bingo"),
334 None,
335 Some("bongo")
336 ]
337 );
338
339 assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
341 assert_eq!(buffer.len(), 0);
342 let d2 = Arc::new(StringArray::from(vec!["bingo", ""])) as ArrayRef;
343 buffer
344 .as_keys(&d2)
345 .unwrap()
346 .extend_from_slice(&[0, 1, 0, 1]);
347
348 let array = std::mem::take(&mut buffer)
349 .into_array(None, &dict_type)
350 .unwrap();
351 assert_eq!(array.data_type(), &dict_type);
352
353 let strings = cast(&array, &ArrowType::Utf8).unwrap();
354 let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
355 assert_eq!(
356 strings.iter().collect::<Vec<_>>(),
357 vec![Some("bingo"), Some(""), Some("bingo"), Some("")]
358 );
359
360 assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
362 assert_eq!(buffer.len(), 0);
363 let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef;
364 buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
365
366 let d4 = Arc::new(StringArray::from(vec!["bananas"])) as ArrayRef;
368 assert!(buffer.as_keys(&d4).is_none());
369 }
370
371 #[test]
372 fn test_validates_keys() {
373 let dict_type =
374 ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
375
376 let mut buffer = DictionaryBuffer::<i32, i32>::default();
377 let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef;
378 buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
379
380 let err = buffer.into_array(None, &dict_type).unwrap_err().to_string();
381 assert!(
382 err.contains("dictionary key beyond bounds of dictionary: 0..2"),
383 "{}",
384 err
385 );
386
387 let mut buffer = DictionaryBuffer::<i32, i32>::default();
388 let d = Arc::new(StringArray::from(vec![""])) as ArrayRef;
389 buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]);
390
391 let err = buffer.spill_values().unwrap_err().to_string();
392 assert!(
393 err.contains("dictionary key beyond bounds of dictionary: 0..1"),
394 "{}",
395 err
396 );
397 }
398}