parquet/arrow/arrow_writer/
byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
21use crate::data_type::{AsBytes, ByteArray, Int32Type};
22use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
23use crate::encodings::rle::RleEncoder;
24use crate::errors::{ParquetError, Result};
25use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
26use crate::schema::types::ColumnDescPtr;
27use crate::util::bit_util::num_required_bits;
28use crate::util::interner::{Interner, Storage};
29use arrow_array::{
30    Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
31    LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
32};
33use arrow_schema::DataType;
34
35macro_rules! downcast_dict_impl {
36    ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
37        $op($array
38            .as_any()
39            .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
40            .unwrap()
41            .downcast_dict::<$val>()
42            .unwrap()$(, $arg)*)
43    }};
44}
45
46macro_rules! downcast_dict_op {
47    ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
48        match $key_type.as_ref() {
49            DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
50            DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
51            DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
52            DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
53            DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
54            DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
55            DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
56            DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
57            _ => unreachable!(),
58        }
59    };
60}
61
62macro_rules! downcast_op {
63    ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
64        match $data_type {
65            DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
66            DataType::LargeUtf8 => {
67                $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
68            }
69            DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
70            DataType::Binary => {
71                $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
72            }
73            DataType::LargeBinary => {
74                $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
75            }
76            DataType::BinaryView => {
77                $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
78            }
79            DataType::Dictionary(key, value) => match value.as_ref() {
80                DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
81                DataType::LargeUtf8 => {
82                    downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
83                }
84                DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
85                DataType::LargeBinary => {
86                    downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
87                }
88                DataType::FixedSizeBinary(_) => {
89                    downcast_dict_op!(key, FixedSizeBinaryArray, $array, $op$(, $arg)*)
90                }
91                d => unreachable!("cannot downcast {} dictionary value to byte array", d),
92            },
93            d => unreachable!("cannot downcast {} to byte array", d),
94        }
95    };
96}
97
98/// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
99struct FallbackEncoder {
100    encoder: FallbackEncoderImpl,
101    num_values: usize,
102    variable_length_bytes: i64,
103}
104
105/// The fallback encoder in use
106///
107/// Note: DeltaBitPackEncoder is boxed as it is rather large
108enum FallbackEncoderImpl {
109    Plain {
110        buffer: Vec<u8>,
111    },
112    DeltaLength {
113        buffer: Vec<u8>,
114        lengths: Box<DeltaBitPackEncoder<Int32Type>>,
115    },
116    Delta {
117        buffer: Vec<u8>,
118        last_value: Vec<u8>,
119        prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
120        suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
121    },
122}
123
124impl FallbackEncoder {
125    /// Create the fallback encoder for the given [`ColumnDescPtr`] and [`WriterProperties`]
126    fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
127        // Set either main encoder or fallback encoder.
128        let encoding =
129            props
130                .encoding(descr.path())
131                .unwrap_or_else(|| match props.writer_version() {
132                    WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
133                    WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
134                });
135
136        let encoder = match encoding {
137            Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
138            Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
139                buffer: vec![],
140                lengths: Box::new(DeltaBitPackEncoder::new()),
141            },
142            Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
143                buffer: vec![],
144                last_value: vec![],
145                prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
146                suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
147            },
148            _ => {
149                return Err(general_err!(
150                    "unsupported encoding {} for byte array",
151                    encoding
152                ))
153            }
154        };
155
156        Ok(Self {
157            encoder,
158            num_values: 0,
159            variable_length_bytes: 0,
160        })
161    }
162
163    /// Encode `values` to the in-progress page
164    fn encode<T>(&mut self, values: T, indices: &[usize])
165    where
166        T: ArrayAccessor + Copy,
167        T::Item: AsRef<[u8]>,
168    {
169        self.num_values += indices.len();
170        match &mut self.encoder {
171            FallbackEncoderImpl::Plain { buffer } => {
172                for idx in indices {
173                    let value = values.value(*idx);
174                    let value = value.as_ref();
175                    buffer.extend_from_slice((value.len() as u32).as_bytes());
176                    buffer.extend_from_slice(value);
177                    self.variable_length_bytes += value.len() as i64;
178                }
179            }
180            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
181                for idx in indices {
182                    let value = values.value(*idx);
183                    let value = value.as_ref();
184                    lengths.put(&[value.len() as i32]).unwrap();
185                    buffer.extend_from_slice(value);
186                    self.variable_length_bytes += value.len() as i64;
187                }
188            }
189            FallbackEncoderImpl::Delta {
190                buffer,
191                last_value,
192                prefix_lengths,
193                suffix_lengths,
194            } => {
195                for idx in indices {
196                    let value = values.value(*idx);
197                    let value = value.as_ref();
198                    let mut prefix_length = 0;
199
200                    while prefix_length < last_value.len()
201                        && prefix_length < value.len()
202                        && last_value[prefix_length] == value[prefix_length]
203                    {
204                        prefix_length += 1;
205                    }
206
207                    let suffix_length = value.len() - prefix_length;
208
209                    last_value.clear();
210                    last_value.extend_from_slice(value);
211
212                    buffer.extend_from_slice(&value[prefix_length..]);
213                    prefix_lengths.put(&[prefix_length as i32]).unwrap();
214                    suffix_lengths.put(&[suffix_length as i32]).unwrap();
215                    self.variable_length_bytes += value.len() as i64;
216                }
217            }
218        }
219    }
220
221    /// Returns an estimate of the data page size in bytes
222    ///
223    /// This includes:
224    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
225    fn estimated_data_page_size(&self) -> usize {
226        match &self.encoder {
227            FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
228            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
229                buffer.len() + lengths.estimated_data_encoded_size()
230            }
231            FallbackEncoderImpl::Delta {
232                buffer,
233                prefix_lengths,
234                suffix_lengths,
235                ..
236            } => {
237                buffer.len()
238                    + prefix_lengths.estimated_data_encoded_size()
239                    + suffix_lengths.estimated_data_encoded_size()
240            }
241        }
242    }
243
244    fn flush_data_page(
245        &mut self,
246        min_value: Option<ByteArray>,
247        max_value: Option<ByteArray>,
248    ) -> Result<DataPageValues<ByteArray>> {
249        let (buf, encoding) = match &mut self.encoder {
250            FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
251            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
252                let lengths = lengths.flush_buffer()?;
253
254                let mut out = Vec::with_capacity(lengths.len() + buffer.len());
255                out.extend_from_slice(&lengths);
256                out.extend_from_slice(buffer);
257                buffer.clear();
258                (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
259            }
260            FallbackEncoderImpl::Delta {
261                buffer,
262                prefix_lengths,
263                suffix_lengths,
264                last_value,
265            } => {
266                let prefix_lengths = prefix_lengths.flush_buffer()?;
267                let suffix_lengths = suffix_lengths.flush_buffer()?;
268
269                let mut out =
270                    Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
271                out.extend_from_slice(&prefix_lengths);
272                out.extend_from_slice(&suffix_lengths);
273                out.extend_from_slice(buffer);
274                buffer.clear();
275                last_value.clear();
276                (out, Encoding::DELTA_BYTE_ARRAY)
277            }
278        };
279
280        // Capture value of variable_length_bytes and reset for next page
281        let variable_length_bytes = Some(self.variable_length_bytes);
282        self.variable_length_bytes = 0;
283
284        Ok(DataPageValues {
285            buf: buf.into(),
286            num_values: std::mem::take(&mut self.num_values),
287            encoding,
288            min_value,
289            max_value,
290            variable_length_bytes,
291        })
292    }
293}
294
295/// [`Storage`] for the [`Interner`] used by [`DictEncoder`]
296#[derive(Debug, Default)]
297struct ByteArrayStorage {
298    /// Encoded dictionary data
299    page: Vec<u8>,
300
301    values: Vec<std::ops::Range<usize>>,
302}
303
304impl Storage for ByteArrayStorage {
305    type Key = u64;
306    type Value = [u8];
307
308    fn get(&self, idx: Self::Key) -> &Self::Value {
309        &self.page[self.values[idx as usize].clone()]
310    }
311
312    fn push(&mut self, value: &Self::Value) -> Self::Key {
313        let key = self.values.len();
314
315        self.page.reserve(4 + value.len());
316        self.page.extend_from_slice((value.len() as u32).as_bytes());
317
318        let start = self.page.len();
319        self.page.extend_from_slice(value);
320        self.values.push(start..self.page.len());
321
322        key as u64
323    }
324
325    #[allow(dead_code)] // not used in parquet_derive, so is dead there
326    fn estimated_memory_size(&self) -> usize {
327        self.page.capacity() * std::mem::size_of::<u8>()
328            + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
329    }
330}
331
332/// A dictionary encoder for byte array data
333#[derive(Debug, Default)]
334struct DictEncoder {
335    interner: Interner<ByteArrayStorage>,
336    indices: Vec<u64>,
337    variable_length_bytes: i64,
338}
339
340impl DictEncoder {
341    /// Encode `values` to the in-progress page
342    fn encode<T>(&mut self, values: T, indices: &[usize])
343    where
344        T: ArrayAccessor + Copy,
345        T::Item: AsRef<[u8]>,
346    {
347        self.indices.reserve(indices.len());
348
349        for idx in indices {
350            let value = values.value(*idx);
351            let interned = self.interner.intern(value.as_ref());
352            self.indices.push(interned);
353            self.variable_length_bytes += value.as_ref().len() as i64;
354        }
355    }
356
357    fn bit_width(&self) -> u8 {
358        let length = self.interner.storage().values.len();
359        num_required_bits(length.saturating_sub(1) as u64)
360    }
361
362    fn estimated_memory_size(&self) -> usize {
363        self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
364    }
365
366    fn estimated_data_page_size(&self) -> usize {
367        let bit_width = self.bit_width();
368        1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
369    }
370
371    fn estimated_dict_page_size(&self) -> usize {
372        self.interner.storage().page.len()
373    }
374
375    fn flush_dict_page(self) -> DictionaryPage {
376        let storage = self.interner.into_inner();
377
378        DictionaryPage {
379            buf: storage.page.into(),
380            num_values: storage.values.len(),
381            is_sorted: false,
382        }
383    }
384
385    fn flush_data_page(
386        &mut self,
387        min_value: Option<ByteArray>,
388        max_value: Option<ByteArray>,
389    ) -> DataPageValues<ByteArray> {
390        let num_values = self.indices.len();
391        let buffer_len = self.estimated_data_page_size();
392        let mut buffer = Vec::with_capacity(buffer_len);
393        buffer.push(self.bit_width());
394
395        let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
396        for index in &self.indices {
397            encoder.put(*index)
398        }
399
400        self.indices.clear();
401
402        // Capture value of variable_length_bytes and reset for next page
403        let variable_length_bytes = Some(self.variable_length_bytes);
404        self.variable_length_bytes = 0;
405
406        DataPageValues {
407            buf: encoder.consume().into(),
408            num_values,
409            encoding: Encoding::RLE_DICTIONARY,
410            min_value,
411            max_value,
412            variable_length_bytes,
413        }
414    }
415}
416
417pub struct ByteArrayEncoder {
418    fallback: FallbackEncoder,
419    dict_encoder: Option<DictEncoder>,
420    statistics_enabled: EnabledStatistics,
421    min_value: Option<ByteArray>,
422    max_value: Option<ByteArray>,
423    bloom_filter: Option<Sbbf>,
424}
425
426impl ColumnValueEncoder for ByteArrayEncoder {
427    type T = ByteArray;
428    type Values = dyn Array;
429    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
430        self.bloom_filter.take()
431    }
432
433    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
434    where
435        Self: Sized,
436    {
437        let dictionary = props
438            .dictionary_enabled(descr.path())
439            .then(DictEncoder::default);
440
441        let fallback = FallbackEncoder::new(descr, props)?;
442
443        let bloom_filter = props
444            .bloom_filter_properties(descr.path())
445            .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
446            .transpose()?;
447
448        let statistics_enabled = props.statistics_enabled(descr.path());
449
450        Ok(Self {
451            fallback,
452            statistics_enabled,
453            bloom_filter,
454            dict_encoder: dictionary,
455            min_value: None,
456            max_value: None,
457        })
458    }
459
460    fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
461        unreachable!("should call write_gather instead")
462    }
463
464    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
465        downcast_op!(values.data_type(), values, encode, indices, self);
466        Ok(())
467    }
468
469    fn num_values(&self) -> usize {
470        match &self.dict_encoder {
471            Some(encoder) => encoder.indices.len(),
472            None => self.fallback.num_values,
473        }
474    }
475
476    fn has_dictionary(&self) -> bool {
477        self.dict_encoder.is_some()
478    }
479
480    fn estimated_memory_size(&self) -> usize {
481        let encoder_size = match &self.dict_encoder {
482            Some(encoder) => encoder.estimated_memory_size(),
483            // For the FallbackEncoder, these unflushed bytes are already encoded.
484            // Therefore, the size should be the same as estimated_data_page_size.
485            None => self.fallback.estimated_data_page_size(),
486        };
487
488        let bloom_filter_size = self
489            .bloom_filter
490            .as_ref()
491            .map(|bf| bf.estimated_memory_size())
492            .unwrap_or_default();
493
494        let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
495            + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
496
497        encoder_size + bloom_filter_size + stats_size
498    }
499
500    fn estimated_dict_page_size(&self) -> Option<usize> {
501        Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
502    }
503
504    /// Returns an estimate of the data page size in bytes
505    ///
506    /// This includes:
507    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
508    fn estimated_data_page_size(&self) -> usize {
509        match &self.dict_encoder {
510            Some(encoder) => encoder.estimated_data_page_size(),
511            None => self.fallback.estimated_data_page_size(),
512        }
513    }
514
515    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
516        match self.dict_encoder.take() {
517            Some(encoder) => {
518                if !encoder.indices.is_empty() {
519                    return Err(general_err!(
520                        "Must flush data pages before flushing dictionary"
521                    ));
522                }
523
524                Ok(Some(encoder.flush_dict_page()))
525            }
526            _ => Ok(None),
527        }
528    }
529
530    fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
531        let min_value = self.min_value.take();
532        let max_value = self.max_value.take();
533
534        match &mut self.dict_encoder {
535            Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
536            _ => self.fallback.flush_data_page(min_value, max_value),
537        }
538    }
539}
540
541/// Encodes the provided `values` and `indices` to `encoder`
542///
543/// This is a free function so it can be used with `downcast_op!`
544fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
545where
546    T: ArrayAccessor + Copy,
547    T::Item: Copy + Ord + AsRef<[u8]>,
548{
549    if encoder.statistics_enabled != EnabledStatistics::None {
550        if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
551            if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
552                encoder.min_value = Some(min);
553            }
554
555            if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
556                encoder.max_value = Some(max);
557            }
558        }
559    }
560
561    // encode the values into bloom filter if enabled
562    if let Some(bloom_filter) = &mut encoder.bloom_filter {
563        let valid = indices.iter().cloned();
564        for idx in valid {
565            bloom_filter.insert(values.value(idx).as_ref());
566        }
567    }
568
569    match &mut encoder.dict_encoder {
570        Some(dict_encoder) => dict_encoder.encode(values, indices),
571        None => encoder.fallback.encode(values, indices),
572    }
573}
574
575/// Computes the min and max for the provided array and indices
576///
577/// This is a free function so it can be used with `downcast_op!`
578fn compute_min_max<T>(
579    array: T,
580    mut valid: impl Iterator<Item = usize>,
581) -> Option<(ByteArray, ByteArray)>
582where
583    T: ArrayAccessor,
584    T::Item: Copy + Ord + AsRef<[u8]>,
585{
586    let first_idx = valid.next()?;
587
588    let first_val = array.value(first_idx);
589    let mut min = first_val;
590    let mut max = first_val;
591    for idx in valid {
592        let val = array.value(idx);
593        min = min.min(val);
594        max = max.max(val);
595    }
596    Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
597}