Skip to main content

parquet/arrow/buffer/
dictionary_buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::buffer::offset_buffer::OffsetBuffer;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use arrow_array::{Array, GenericByteArray, downcast_integer};
22use arrow_array::{
23    ArrayRef, FixedSizeBinaryArray, OffsetSizeTrait,
24    builder::{FixedSizeBinaryDictionaryBuilder, GenericByteDictionaryBuilder},
25    cast::AsArray,
26    make_array,
27    types::{ArrowDictionaryKeyType, ByteArrayType},
28};
29use arrow_buffer::{ArrowNativeType, Buffer};
30use arrow_data::ArrayDataBuilder;
31use arrow_schema::DataType as ArrowType;
32use std::sync::Arc;
33
34/// An array of variable length byte arrays that are potentially dictionary encoded
35/// and can be converted into a corresponding [`ArrayRef`]
36pub enum DictionaryBuffer<K: ArrowNativeType, V: OffsetSizeTrait> {
37    Dict { keys: Vec<K>, values: ArrayRef },
38    Values { values: OffsetBuffer<V> },
39}
40
41impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
42    #[allow(unused)]
43    pub fn len(&self) -> usize {
44        match self {
45            Self::Dict { keys, .. } => keys.len(),
46            Self::Values { values } => values.len(),
47        }
48    }
49
50    /// Returns a mutable reference to a keys array
51    ///
52    /// Returns None if the dictionary needs to be recomputed
53    ///
54    /// # Panic
55    ///
56    /// Panics if the dictionary is too large for `K`
57    pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut Vec<K>> {
58        assert!(K::from_usize(dictionary.len()).is_some());
59
60        match self {
61            Self::Dict { keys, values } => {
62                // Need to discard fat pointer for equality check
63                // - https://stackoverflow.com/a/67114787
64                // - https://github.com/rust-lang/rust/issues/46139
65                let values_ptr = values.as_ref() as *const _ as *const ();
66                let dict_ptr = dictionary.as_ref() as *const _ as *const ();
67                if values_ptr == dict_ptr {
68                    Some(keys)
69                } else if keys.is_empty() {
70                    *values = Arc::clone(dictionary);
71                    Some(keys)
72                } else {
73                    None
74                }
75            }
76            Self::Values { values } if values.is_empty() => {
77                *self = Self::Dict {
78                    keys: Default::default(),
79                    values: Arc::clone(dictionary),
80                };
81                match self {
82                    Self::Dict { keys, .. } => Some(keys),
83                    _ => unreachable!(),
84                }
85            }
86            _ => None,
87        }
88    }
89
90    /// Returns a mutable reference to a values array
91    ///
92    /// If this is currently dictionary encoded, this will convert from the
93    /// dictionary encoded representation
94    pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer<V>> {
95        match self {
96            Self::Values { values } => Ok(values),
97            Self::Dict { keys, values } => {
98                let mut spilled = OffsetBuffer::with_capacity(0);
99                let data = values.to_data();
100                let dict_buffers = data.buffers();
101                let dict_offsets = dict_buffers[0].typed_data::<V>();
102                let dict_values = dict_buffers[1].as_slice();
103
104                if values.is_empty() {
105                    // If dictionary is empty, zero pad offsets
106                    spilled.offsets.resize(keys.len() + 1, V::default());
107                } else {
108                    // Note: at this point null positions will have arbitrary dictionary keys
109                    // and this will hydrate them to the corresponding byte array. This is
110                    // likely sub-optimal, as we would prefer zero length null "slots", but
111                    // spilling is already a degenerate case and so it is unclear if this is
112                    // worth optimising for, e.g. by keeping a null mask around
113                    spilled.extend_from_dictionary(keys.as_slice(), dict_offsets, dict_values)?;
114                }
115
116                *self = Self::Values { values: spilled };
117                match self {
118                    Self::Values { values } => Ok(values),
119                    _ => unreachable!(),
120                }
121            }
122        }
123    }
124
125    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
126    pub fn into_array(
127        self,
128        null_buffer: Option<Buffer>,
129        data_type: &ArrowType,
130    ) -> Result<ArrayRef> {
131        assert!(matches!(data_type, ArrowType::Dictionary(_, _)));
132
133        match self {
134            Self::Dict { keys, values } => {
135                // Validate keys unless dictionary is empty
136                if !values.is_empty() {
137                    let min = K::from_usize(0).unwrap();
138                    let max = K::from_usize(values.len()).unwrap();
139
140                    // using copied and fold gets auto-vectorized since rust 1.70
141                    // all/any would allow early exit on invalid values
142                    // but in the happy case all values have to be checked anyway
143                    if !keys
144                        .as_slice()
145                        .iter()
146                        .copied()
147                        .fold(true, |a, x| a && x >= min && x < max)
148                    {
149                        return Err(general_err!(
150                            "dictionary key beyond bounds of dictionary: 0..{}",
151                            values.len()
152                        ));
153                    }
154                }
155
156                let ArrowType::Dictionary(_, value_type) = data_type else {
157                    unreachable!()
158                };
159                let values = if let ArrowType::FixedSizeBinary(size) = **value_type {
160                    let binary = values.as_binary::<i32>();
161                    Arc::new(FixedSizeBinaryArray::new(
162                        size,
163                        binary.values().clone(),
164                        binary.nulls().cloned(),
165                    )) as _
166                } else {
167                    values
168                };
169
170                let builder = ArrayDataBuilder::new(data_type.clone())
171                    .len(keys.len())
172                    .add_buffer(Buffer::from_vec(keys))
173                    .add_child_data(values.into_data())
174                    .null_bit_buffer(null_buffer);
175
176                let data = match cfg!(debug_assertions) {
177                    true => builder.build().unwrap(),
178                    false => unsafe { builder.build_unchecked() },
179                };
180
181                Ok(make_array(data))
182            }
183            Self::Values { values } => {
184                let (key_type, value_type) = match data_type {
185                    ArrowType::Dictionary(k, v) => (k, v.as_ref().clone()),
186                    _ => unreachable!(),
187                };
188
189                let array = values.into_array(null_buffer, value_type);
190                pack_values(key_type, &array)
191            }
192        }
193    }
194}
195
196impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K, V> {
197    fn with_capacity(capacity: usize) -> Self {
198        Self::Values {
199            values: OffsetBuffer::with_capacity(capacity),
200        }
201    }
202
203    fn pad_nulls(
204        &mut self,
205        read_offset: usize,
206        values_read: usize,
207        levels_read: usize,
208        valid_mask: &[u8],
209    ) {
210        match self {
211            Self::Dict { keys, .. } => {
212                keys.resize(read_offset + levels_read, K::default());
213                keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
214            }
215            Self::Values { values, .. } => {
216                values.pad_nulls(read_offset, values_read, levels_read, valid_mask)
217            }
218        }
219    }
220}
221
222macro_rules! dict_helper {
223    ($k:ty, $array:ident) => {
224        match $array.data_type() {
225            ArrowType::Utf8 => pack_values_impl::<$k, _>($array.as_string::<i32>()),
226            ArrowType::LargeUtf8 => pack_values_impl::<$k, _>($array.as_string::<i64>()),
227            ArrowType::Binary => pack_values_impl::<$k, _>($array.as_binary::<i32>()),
228            ArrowType::LargeBinary => pack_values_impl::<$k, _>($array.as_binary::<i64>()),
229            ArrowType::FixedSizeBinary(_) => {
230                pack_fixed_values_impl::<$k>($array.as_fixed_size_binary())
231            }
232            _ => unreachable!(),
233        }
234    };
235}
236
237fn pack_values(key_type: &ArrowType, values: &ArrayRef) -> Result<ArrayRef> {
238    downcast_integer! {
239        key_type => (dict_helper, values),
240            _ => unreachable!(),
241    }
242}
243
244fn pack_values_impl<K: ArrowDictionaryKeyType, T: ByteArrayType>(
245    array: &GenericByteArray<T>,
246) -> Result<ArrayRef> {
247    let mut builder = GenericByteDictionaryBuilder::<K, T>::with_capacity(array.len(), 1024, 1024);
248    for x in array {
249        match x {
250            Some(x) => builder.append_value(x),
251            None => builder.append_null(),
252        }
253    }
254    let raw = builder.finish();
255    Ok(Arc::new(raw))
256}
257
258fn pack_fixed_values_impl<K: ArrowDictionaryKeyType>(
259    array: &FixedSizeBinaryArray,
260) -> Result<ArrayRef> {
261    let mut builder = FixedSizeBinaryDictionaryBuilder::<K>::with_capacity(
262        array.len(),
263        1024,
264        array.value_length(),
265    );
266    for x in array {
267        match x {
268            Some(x) => builder.append_value(x),
269            None => builder.append_null(),
270        }
271    }
272    let raw = builder.finish();
273    Ok(Arc::new(raw))
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use arrow::compute::cast;
280    use arrow_array::StringArray;
281
282    #[test]
283    fn test_dictionary_buffer() {
284        let dict_type =
285            ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
286
287        let d1: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world", "", "a", "b"]));
288
289        let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
290
291        // Read some data preserving the dictionary
292        let values = &[1, 0, 3, 2, 4];
293        buffer.as_keys(&d1).unwrap().extend_from_slice(values);
294
295        let mut valid = vec![false, false, true, true, false, true, true, true];
296        let valid_buffer = Buffer::from_iter(valid.iter().cloned());
297        buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
298
299        // Read some data not preserving the dictionary
300
301        let values = buffer.spill_values().unwrap();
302        let read_offset = values.len();
303        values.try_push("bingo".as_bytes(), false).unwrap();
304        values.try_push("bongo".as_bytes(), false).unwrap();
305
306        valid.extend_from_slice(&[false, false, true, false, true]);
307        let null_buffer = Buffer::from_iter(valid.iter().cloned());
308        buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
309
310        assert_eq!(buffer.len(), 13);
311        let split = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0));
312
313        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
314        assert_eq!(array.data_type(), &dict_type);
315
316        let strings = cast(&array, &ArrowType::Utf8).unwrap();
317        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
318        assert_eq!(
319            strings.iter().collect::<Vec<_>>(),
320            vec![
321                None,
322                None,
323                Some("world"),
324                Some("hello"),
325                None,
326                Some("a"),
327                Some(""),
328                Some("b"),
329                None,
330                None,
331                Some("bingo"),
332                None,
333                Some("bongo")
334            ]
335        );
336
337        // Can recreate with new dictionary as values is empty
338        assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
339        assert_eq!(buffer.len(), 0);
340        let d2 = Arc::new(StringArray::from(vec!["bingo", ""])) as ArrayRef;
341        buffer
342            .as_keys(&d2)
343            .unwrap()
344            .extend_from_slice(&[0, 1, 0, 1]);
345
346        let array = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0))
347            .into_array(None, &dict_type)
348            .unwrap();
349        assert_eq!(array.data_type(), &dict_type);
350
351        let strings = cast(&array, &ArrowType::Utf8).unwrap();
352        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
353        assert_eq!(
354            strings.iter().collect::<Vec<_>>(),
355            vec![Some("bingo"), Some(""), Some("bingo"), Some("")]
356        );
357
358        // Can recreate with new dictionary as keys empty
359        assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
360        assert_eq!(buffer.len(), 0);
361        let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef;
362        buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
363
364        // Cannot change dictionary as keys not empty
365        let d4 = Arc::new(StringArray::from(vec!["bananas"])) as ArrayRef;
366        assert!(buffer.as_keys(&d4).is_none());
367    }
368
369    #[test]
370    fn test_validates_keys() {
371        let dict_type =
372            ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8));
373
374        let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
375        let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef;
376        buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
377
378        let err = buffer.into_array(None, &dict_type).unwrap_err().to_string();
379        assert!(
380            err.contains("dictionary key beyond bounds of dictionary: 0..2"),
381            "{}",
382            err
383        );
384
385        let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
386        let d = Arc::new(StringArray::from(vec![""])) as ArrayRef;
387        buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]);
388
389        let err = buffer.spill_values().unwrap_err().to_string();
390        assert!(
391            err.contains("dictionary key beyond bounds of dictionary: 0..1"),
392            "{}",
393            err
394        );
395    }
396}