Skip to main content

parquet/arrow/buffer/
dictionary_buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::buffer::offset_buffer::OffsetBuffer;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use arrow_array::{Array, GenericByteArray, downcast_integer};
22use arrow_array::{
23    ArrayRef, FixedSizeBinaryArray, OffsetSizeTrait,
24    builder::{FixedSizeBinaryDictionaryBuilder, GenericByteDictionaryBuilder},
25    cast::AsArray,
26    make_array,
27    types::{ArrowDictionaryKeyType, ByteArrayType},
28};
29use arrow_buffer::{ArrowNativeType, Buffer};
30use arrow_data::ArrayDataBuilder;
31use arrow_schema::DataType as ArrowType;
32use std::sync::Arc;
33
34/// An array of variable length byte arrays that are potentially dictionary encoded
35/// and can be converted into a corresponding [`ArrayRef`]
36pub enum DictionaryBuffer<K: ArrowNativeType, V: OffsetSizeTrait> {
37    Dict { keys: Vec<K>, values: ArrayRef },
38    Values { values: OffsetBuffer<V> },
39}
40
41impl<K: ArrowNativeType, V: OffsetSizeTrait> Default for DictionaryBuffer<K, V> {
42    fn default() -> Self {
43        Self::Values {
44            values: Default::default(),
45        }
46    }
47}
48
49impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
50    #[allow(unused)]
51    pub fn len(&self) -> usize {
52        match self {
53            Self::Dict { keys, .. } => keys.len(),
54            Self::Values { values } => values.len(),
55        }
56    }
57
58    /// Returns a mutable reference to a keys array
59    ///
60    /// Returns None if the dictionary needs to be recomputed
61    ///
62    /// # Panic
63    ///
64    /// Panics if the dictionary is too large for `K`
65    pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut Vec<K>> {
66        assert!(K::from_usize(dictionary.len()).is_some());
67
68        match self {
69            Self::Dict { keys, values } => {
70                // Need to discard fat pointer for equality check
71                // - https://stackoverflow.com/a/67114787
72                // - https://github.com/rust-lang/rust/issues/46139
73                let values_ptr = values.as_ref() as *const _ as *const ();
74                let dict_ptr = dictionary.as_ref() as *const _ as *const ();
75                if values_ptr == dict_ptr {
76                    Some(keys)
77                } else if keys.is_empty() {
78                    *values = Arc::clone(dictionary);
79                    Some(keys)
80                } else {
81                    None
82                }
83            }
84            Self::Values { values } if values.is_empty() => {
85                *self = Self::Dict {
86                    keys: Default::default(),
87                    values: Arc::clone(dictionary),
88                };
89                match self {
90                    Self::Dict { keys, .. } => Some(keys),
91                    _ => unreachable!(),
92                }
93            }
94            _ => None,
95        }
96    }
97
98    /// Returns a mutable reference to a values array
99    ///
100    /// If this is currently dictionary encoded, this will convert from the
101    /// dictionary encoded representation
102    pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
103        match self {
104            Self::Values { values } => Ok(values),
105            Self::Dict { keys, values } => {
106                let mut spilled = OffsetBuffer::default();
107                let data = values.to_data();
108                let dict_buffers = data.buffers();
109                let dict_offsets = dict_buffers[0].typed_data::<V>();
110                let dict_values = dict_buffers[1].as_slice();
111
112                if values.is_empty() {
113                    // If dictionary is empty, zero pad offsets
114                    spilled.offsets.resize(keys.len() + 1, V::default());
115                } else {
116                    // Note: at this point null positions will have arbitrary dictionary keys
117                    // and this will hydrate them to the corresponding byte array. This is
118                    // likely sub-optimal, as we would prefer zero length null "slots", but
119                    // spilling is already a degenerate case and so it is unclear if this is
120                    // worth optimising for, e.g. by keeping a null mask around
121                    spilled.extend_from_dictionary(keys.as_slice(), dict_offsets, dict_values)?;
122                }
123
124                *self = Self::Values { values: spilled };
125                match self {
126                    Self::Values { values } => Ok(values),
127                    _ => unreachable!(),
128                }
129            }
130        }
131    }
132
133    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
134    pub fn into_array(
135        self,
136        null_buffer: Option<Buffer>,
137        data_type: &ArrowType,
138    ) -> Result<ArrayRef> {
139        assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
140
141        match self {
142            Self::Dict { keys, values } => {
143                // Validate keys unless dictionary is empty
144                if !values.is_empty() {
145                    let min = K::from_usize(0).unwrap();
146                    let max = K::from_usize(values.len()).unwrap();
147
148                    // using copied and fold gets auto-vectorized since rust 1.70
149                    // all/any would allow early exit on invalid values
150                    // but in the happy case all values have to be checked anyway
151                    if !keys
152                        .as_slice()
153                        .iter()
154                        .copied()
155                        .fold(true, |a, x| a && x >= min && x < max)
156                    {
157                        return Err(general_err!(
158                            "dictionary key beyond bounds of dictionary: 0..{}",
159                            values.len()
160                        ));
161                    }
162                }
163
164                let ArrowType::Dictionary(_, value_type) = data_type else {
165                    unreachable!()
166                };
167                let values = if let ArrowType::FixedSizeBinary(size) = **value_type {
168                    let binary = values.as_binary::<i32>();
169                    Arc::new(FixedSizeBinaryArray::new(
170                        size,
171                        binary.values().clone(),
172                        binary.nulls().cloned(),
173                    )) as _
174                } else {
175                    values
176                };
177
178                let builder = ArrayDataBuilder::new(data_type.clone())
179                    .len(keys.len())
180                    .add_buffer(Buffer::from_vec(keys))
181                    .add_child_data(values.into_data())
182                    .null_bit_buffer(null_buffer);
183
184                let data = match cfg!(debug_assertions) {
185                    true => builder.build().unwrap(),
186                    false => unsafe { builder.build_unchecked() },
187                };
188
189                Ok(make_array(data))
190            }
191            Self::Values { values } => {
192                let (key_type, value_type) = match data_type {
193                    ArrowType::Dictionary(k, v) => (k, v.as_ref().clone()),
194                    _ => unreachable!(),
195                };
196
197                let array = values.into_array(null_buffer, value_type);
198                pack_values(key_type, &array)
199            }
200        }
201    }
202}
203
204impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K, V> {
205    fn pad_nulls(
206        &mut self,
207        read_offset: usize,
208        values_read: usize,
209        levels_read: usize,
210        valid_mask: &[u8],
211    ) {
212        match self {
213            Self::Dict { keys, .. } => {
214                keys.resize(read_offset + levels_read, K::default());
215                keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
216            }
217            Self::Values { values, .. } => {
218                values.pad_nulls(read_offset, values_read, levels_read, valid_mask)
219            }
220        }
221    }
222}
223
224macro_rules! dict_helper {
225    ($k:ty, $array:ident) => {
226        match $array.data_type() {
227            ArrowType::Utf8 => pack_values_impl::<$k, _>($array.as_string::<i32>()),
228            ArrowType::LargeUtf8 => pack_values_impl::<$k, _>($array.as_string::<i64>()),
229            ArrowType::Binary => pack_values_impl::<$k, _>($array.as_binary::<i32>()),
230            ArrowType::LargeBinary => pack_values_impl::<$k, _>($array.as_binary::<i64>()),
231            ArrowType::FixedSizeBinary(_) => {
232                pack_fixed_values_impl::<$k>($array.as_fixed_size_binary())
233            }
234            _ => unreachable!(),
235        }
236    };
237}
238
239fn pack_values(key_type: &ArrowType, values: &ArrayRef) -> Result<ArrayRef> {
240    downcast_integer! {
241        key_type => (dict_helper, values),
242            _ => unreachable!(),
243    }
244}
245
246fn pack_values_impl<K: ArrowDictionaryKeyType, T: ByteArrayType>(
247    array: &GenericByteArray<T>,
248) -> Result<ArrayRef> {
249    let mut builder = GenericByteDictionaryBuilder::<K, T>::with_capacity(array.len(), 1024, 1024);
250    for x in array {
251        match x {
252            Some(x) => builder.append_value(x),
253            None => builder.append_null(),
254        }
255    }
256    let raw = builder.finish();
257    Ok(Arc::new(raw))
258}
259
260fn pack_fixed_values_impl<K: ArrowDictionaryKeyType>(
261    array: &FixedSizeBinaryArray,
262) -> Result<ArrayRef> {
263    let mut builder = FixedSizeBinaryDictionaryBuilder::<K>::with_capacity(
264        array.len(),
265        1024,
266        array.value_length(),
267    );
268    for x in array {
269        match x {
270            Some(x) => builder.append_value(x),
271            None => builder.append_null(),
272        }
273    }
274    let raw = builder.finish();
275    Ok(Arc::new(raw))
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use arrow::compute::cast;
282    use arrow_array::StringArray;
283
284    #[test]
285    fn test_dictionary_buffer() {
286        let dict_type =
287            ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
288
289        let d1: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world", "", "a", "b"]));
290
291        let mut buffer = DictionaryBuffer::<i32, i32>::default();
292
293        // Read some data preserving the dictionary
294        let values = &[1, 0, 3, 2, 4];
295        buffer.as_keys(&d1).unwrap().extend_from_slice(values);
296
297        let mut valid = vec![false, false, true, true, false, true, true, true];
298        let valid_buffer = Buffer::from_iter(valid.iter().cloned());
299        buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
300
301        // Read some data not preserving the dictionary
302
303        let values = buffer.spill_values().unwrap();
304        let read_offset = values.len();
305        values.try_push("bingo".as_bytes(), false).unwrap();
306        values.try_push("bongo".as_bytes(), false).unwrap();
307
308        valid.extend_from_slice(&[false, false, true, false, true]);
309        let null_buffer = Buffer::from_iter(valid.iter().cloned());
310        buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
311
312        assert_eq!(buffer.len(), 13);
313        let split = std::mem::take(&mut buffer);
314
315        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
316        assert_eq!(array.data_type(), &dict_type);
317
318        let strings = cast(&array, &ArrowType::Utf8).unwrap();
319        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
320        assert_eq!(
321            strings.iter().collect::<Vec<_>>(),
322            vec![
323                None,
324                None,
325                Some("world"),
326                Some("hello"),
327                None,
328                Some("a"),
329                Some(""),
330                Some("b"),
331                None,
332                None,
333                Some("bingo"),
334                None,
335                Some("bongo")
336            ]
337        );
338
339        // Can recreate with new dictionary as values is empty
340        assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
341        assert_eq!(buffer.len(), 0);
342        let d2 = Arc::new(StringArray::from(vec!["bingo", ""])) as ArrayRef;
343        buffer
344            .as_keys(&d2)
345            .unwrap()
346            .extend_from_slice(&[0, 1, 0, 1]);
347
348        let array = std::mem::take(&mut buffer)
349            .into_array(None, &dict_type)
350            .unwrap();
351        assert_eq!(array.data_type(), &dict_type);
352
353        let strings = cast(&array, &ArrowType::Utf8).unwrap();
354        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
355        assert_eq!(
356            strings.iter().collect::<Vec<_>>(),
357            vec![Some("bingo"), Some(""), Some("bingo"), Some("")]
358        );
359
360        // Can recreate with new dictionary as keys empty
361        assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
362        assert_eq!(buffer.len(), 0);
363        let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef;
364        buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
365
366        // Cannot change dictionary as keys not empty
367        let d4 = Arc::new(StringArray::from(vec!["bananas"])) as ArrayRef;
368        assert!(buffer.as_keys(&d4).is_none());
369    }
370
371    #[test]
372    fn test_validates_keys() {
373        let dict_type =
374            ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
375
376        let mut buffer = DictionaryBuffer::<i32, i32>::default();
377        let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef;
378        buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
379
380        let err = buffer.into_array(None, &dict_type).unwrap_err().to_string();
381        assert!(
382            err.contains("dictionary key beyond bounds of dictionary: 0..2"),
383            "{}",
384            err
385        );
386
387        let mut buffer = DictionaryBuffer::<i32, i32>::default();
388        let d = Arc::new(StringArray::from(vec![""])) as ArrayRef;
389        buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]);
390
391        let err = buffer.spill_values().unwrap_err().to_string();
392        assert!(
393            err.contains("dictionary key beyond bounds of dictionary: 0..1"),
394            "{}",
395            err
396        );
397    }
398}