parquet/arrow/arrow_writer/
byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
21use crate::data_type::{AsBytes, ByteArray, Int32Type};
22use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
23use crate::encodings::rle::RleEncoder;
24use crate::errors::{ParquetError, Result};
25use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
26use crate::schema::types::ColumnDescPtr;
27use crate::util::bit_util::num_required_bits;
28use crate::util::interner::{Interner, Storage};
29use arrow_array::{
30    Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, LargeBinaryArray,
31    LargeStringArray, StringArray, StringViewArray,
32};
33use arrow_schema::DataType;
34
35macro_rules! downcast_dict_impl {
36    ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
37        $op($array
38            .as_any()
39            .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
40            .unwrap()
41            .downcast_dict::<$val>()
42            .unwrap()$(, $arg)*)
43    }};
44}
45
46macro_rules! downcast_dict_op {
47    ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
48        match $key_type.as_ref() {
49            DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
50            DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
51            DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
52            DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
53            DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
54            DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
55            DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
56            DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
57            _ => unreachable!(),
58        }
59    };
60}
61
62macro_rules! downcast_op {
63    ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
64        match $data_type {
65            DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
66            DataType::LargeUtf8 => {
67                $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
68            }
69            DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
70            DataType::Binary => {
71                $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
72            }
73            DataType::LargeBinary => {
74                $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
75            }
76            DataType::BinaryView => {
77                $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
78            }
79            DataType::Dictionary(key, value) => match value.as_ref() {
80                DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
81                DataType::LargeUtf8 => {
82                    downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
83                }
84                DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
85                DataType::LargeBinary => {
86                    downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
87                }
88                d => unreachable!("cannot downcast {} dictionary value to byte array", d),
89            },
90            d => unreachable!("cannot downcast {} to byte array", d),
91        }
92    };
93}
94
95/// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
96struct FallbackEncoder {
97    encoder: FallbackEncoderImpl,
98    num_values: usize,
99    variable_length_bytes: i64,
100}
101
102/// The fallback encoder in use
103///
104/// Note: DeltaBitPackEncoder is boxed as it is rather large
105enum FallbackEncoderImpl {
106    Plain {
107        buffer: Vec<u8>,
108    },
109    DeltaLength {
110        buffer: Vec<u8>,
111        lengths: Box<DeltaBitPackEncoder<Int32Type>>,
112    },
113    Delta {
114        buffer: Vec<u8>,
115        last_value: Vec<u8>,
116        prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
117        suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
118    },
119}
120
121impl FallbackEncoder {
122    /// Create the fallback encoder for the given [`ColumnDescPtr`] and [`WriterProperties`]
123    fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
124        // Set either main encoder or fallback encoder.
125        let encoding =
126            props
127                .encoding(descr.path())
128                .unwrap_or_else(|| match props.writer_version() {
129                    WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
130                    WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
131                });
132
133        let encoder = match encoding {
134            Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
135            Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
136                buffer: vec![],
137                lengths: Box::new(DeltaBitPackEncoder::new()),
138            },
139            Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
140                buffer: vec![],
141                last_value: vec![],
142                prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
143                suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
144            },
145            _ => {
146                return Err(general_err!(
147                    "unsupported encoding {} for byte array",
148                    encoding
149                ))
150            }
151        };
152
153        Ok(Self {
154            encoder,
155            num_values: 0,
156            variable_length_bytes: 0,
157        })
158    }
159
160    /// Encode `values` to the in-progress page
161    fn encode<T>(&mut self, values: T, indices: &[usize])
162    where
163        T: ArrayAccessor + Copy,
164        T::Item: AsRef<[u8]>,
165    {
166        self.num_values += indices.len();
167        match &mut self.encoder {
168            FallbackEncoderImpl::Plain { buffer } => {
169                for idx in indices {
170                    let value = values.value(*idx);
171                    let value = value.as_ref();
172                    buffer.extend_from_slice((value.len() as u32).as_bytes());
173                    buffer.extend_from_slice(value);
174                    self.variable_length_bytes += value.len() as i64;
175                }
176            }
177            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
178                for idx in indices {
179                    let value = values.value(*idx);
180                    let value = value.as_ref();
181                    lengths.put(&[value.len() as i32]).unwrap();
182                    buffer.extend_from_slice(value);
183                    self.variable_length_bytes += value.len() as i64;
184                }
185            }
186            FallbackEncoderImpl::Delta {
187                buffer,
188                last_value,
189                prefix_lengths,
190                suffix_lengths,
191            } => {
192                for idx in indices {
193                    let value = values.value(*idx);
194                    let value = value.as_ref();
195                    let mut prefix_length = 0;
196
197                    while prefix_length < last_value.len()
198                        && prefix_length < value.len()
199                        && last_value[prefix_length] == value[prefix_length]
200                    {
201                        prefix_length += 1;
202                    }
203
204                    let suffix_length = value.len() - prefix_length;
205
206                    last_value.clear();
207                    last_value.extend_from_slice(value);
208
209                    buffer.extend_from_slice(&value[prefix_length..]);
210                    prefix_lengths.put(&[prefix_length as i32]).unwrap();
211                    suffix_lengths.put(&[suffix_length as i32]).unwrap();
212                    self.variable_length_bytes += value.len() as i64;
213                }
214            }
215        }
216    }
217
218    /// Returns an estimate of the data page size in bytes
219    ///
220    /// This includes:
221    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
222    fn estimated_data_page_size(&self) -> usize {
223        match &self.encoder {
224            FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
225            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
226                buffer.len() + lengths.estimated_data_encoded_size()
227            }
228            FallbackEncoderImpl::Delta {
229                buffer,
230                prefix_lengths,
231                suffix_lengths,
232                ..
233            } => {
234                buffer.len()
235                    + prefix_lengths.estimated_data_encoded_size()
236                    + suffix_lengths.estimated_data_encoded_size()
237            }
238        }
239    }
240
241    fn flush_data_page(
242        &mut self,
243        min_value: Option<ByteArray>,
244        max_value: Option<ByteArray>,
245    ) -> Result<DataPageValues<ByteArray>> {
246        let (buf, encoding) = match &mut self.encoder {
247            FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
248            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
249                let lengths = lengths.flush_buffer()?;
250
251                let mut out = Vec::with_capacity(lengths.len() + buffer.len());
252                out.extend_from_slice(&lengths);
253                out.extend_from_slice(buffer);
254                buffer.clear();
255                (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
256            }
257            FallbackEncoderImpl::Delta {
258                buffer,
259                prefix_lengths,
260                suffix_lengths,
261                last_value,
262            } => {
263                let prefix_lengths = prefix_lengths.flush_buffer()?;
264                let suffix_lengths = suffix_lengths.flush_buffer()?;
265
266                let mut out =
267                    Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
268                out.extend_from_slice(&prefix_lengths);
269                out.extend_from_slice(&suffix_lengths);
270                out.extend_from_slice(buffer);
271                buffer.clear();
272                last_value.clear();
273                (out, Encoding::DELTA_BYTE_ARRAY)
274            }
275        };
276
277        // Capture value of variable_length_bytes and reset for next page
278        let variable_length_bytes = Some(self.variable_length_bytes);
279        self.variable_length_bytes = 0;
280
281        Ok(DataPageValues {
282            buf: buf.into(),
283            num_values: std::mem::take(&mut self.num_values),
284            encoding,
285            min_value,
286            max_value,
287            variable_length_bytes,
288        })
289    }
290}
291
292/// [`Storage`] for the [`Interner`] used by [`DictEncoder`]
293#[derive(Debug, Default)]
294struct ByteArrayStorage {
295    /// Encoded dictionary data
296    page: Vec<u8>,
297
298    values: Vec<std::ops::Range<usize>>,
299}
300
301impl Storage for ByteArrayStorage {
302    type Key = u64;
303    type Value = [u8];
304
305    fn get(&self, idx: Self::Key) -> &Self::Value {
306        &self.page[self.values[idx as usize].clone()]
307    }
308
309    fn push(&mut self, value: &Self::Value) -> Self::Key {
310        let key = self.values.len();
311
312        self.page.reserve(4 + value.len());
313        self.page.extend_from_slice((value.len() as u32).as_bytes());
314
315        let start = self.page.len();
316        self.page.extend_from_slice(value);
317        self.values.push(start..self.page.len());
318
319        key as u64
320    }
321
322    #[allow(dead_code)] // not used in parquet_derive, so is dead there
323    fn estimated_memory_size(&self) -> usize {
324        self.page.capacity() * std::mem::size_of::<u8>()
325            + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
326    }
327}
328
329/// A dictionary encoder for byte array data
330#[derive(Debug, Default)]
331struct DictEncoder {
332    interner: Interner<ByteArrayStorage>,
333    indices: Vec<u64>,
334    variable_length_bytes: i64,
335}
336
337impl DictEncoder {
338    /// Encode `values` to the in-progress page
339    fn encode<T>(&mut self, values: T, indices: &[usize])
340    where
341        T: ArrayAccessor + Copy,
342        T::Item: AsRef<[u8]>,
343    {
344        self.indices.reserve(indices.len());
345
346        for idx in indices {
347            let value = values.value(*idx);
348            let interned = self.interner.intern(value.as_ref());
349            self.indices.push(interned);
350            self.variable_length_bytes += value.as_ref().len() as i64;
351        }
352    }
353
354    fn bit_width(&self) -> u8 {
355        let length = self.interner.storage().values.len();
356        num_required_bits(length.saturating_sub(1) as u64)
357    }
358
359    fn estimated_memory_size(&self) -> usize {
360        self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
361    }
362
363    fn estimated_data_page_size(&self) -> usize {
364        let bit_width = self.bit_width();
365        1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
366    }
367
368    fn estimated_dict_page_size(&self) -> usize {
369        self.interner.storage().page.len()
370    }
371
372    fn flush_dict_page(self) -> DictionaryPage {
373        let storage = self.interner.into_inner();
374
375        DictionaryPage {
376            buf: storage.page.into(),
377            num_values: storage.values.len(),
378            is_sorted: false,
379        }
380    }
381
382    fn flush_data_page(
383        &mut self,
384        min_value: Option<ByteArray>,
385        max_value: Option<ByteArray>,
386    ) -> DataPageValues<ByteArray> {
387        let num_values = self.indices.len();
388        let buffer_len = self.estimated_data_page_size();
389        let mut buffer = Vec::with_capacity(buffer_len);
390        buffer.push(self.bit_width());
391
392        let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
393        for index in &self.indices {
394            encoder.put(*index)
395        }
396
397        self.indices.clear();
398
399        // Capture value of variable_length_bytes and reset for next page
400        let variable_length_bytes = Some(self.variable_length_bytes);
401        self.variable_length_bytes = 0;
402
403        DataPageValues {
404            buf: encoder.consume().into(),
405            num_values,
406            encoding: Encoding::RLE_DICTIONARY,
407            min_value,
408            max_value,
409            variable_length_bytes,
410        }
411    }
412}
413
414pub struct ByteArrayEncoder {
415    fallback: FallbackEncoder,
416    dict_encoder: Option<DictEncoder>,
417    statistics_enabled: EnabledStatistics,
418    min_value: Option<ByteArray>,
419    max_value: Option<ByteArray>,
420    bloom_filter: Option<Sbbf>,
421}
422
423impl ColumnValueEncoder for ByteArrayEncoder {
424    type T = ByteArray;
425    type Values = dyn Array;
426    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
427        self.bloom_filter.take()
428    }
429
430    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
431    where
432        Self: Sized,
433    {
434        let dictionary = props
435            .dictionary_enabled(descr.path())
436            .then(DictEncoder::default);
437
438        let fallback = FallbackEncoder::new(descr, props)?;
439
440        let bloom_filter = props
441            .bloom_filter_properties(descr.path())
442            .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
443            .transpose()?;
444
445        let statistics_enabled = props.statistics_enabled(descr.path());
446
447        Ok(Self {
448            fallback,
449            statistics_enabled,
450            bloom_filter,
451            dict_encoder: dictionary,
452            min_value: None,
453            max_value: None,
454        })
455    }
456
457    fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
458        unreachable!("should call write_gather instead")
459    }
460
461    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
462        downcast_op!(values.data_type(), values, encode, indices, self);
463        Ok(())
464    }
465
466    fn num_values(&self) -> usize {
467        match &self.dict_encoder {
468            Some(encoder) => encoder.indices.len(),
469            None => self.fallback.num_values,
470        }
471    }
472
473    fn has_dictionary(&self) -> bool {
474        self.dict_encoder.is_some()
475    }
476
477    fn estimated_memory_size(&self) -> usize {
478        let encoder_size = match &self.dict_encoder {
479            Some(encoder) => encoder.estimated_memory_size(),
480            // For the FallbackEncoder, these unflushed bytes are already encoded.
481            // Therefore, the size should be the same as estimated_data_page_size.
482            None => self.fallback.estimated_data_page_size(),
483        };
484
485        let bloom_filter_size = self
486            .bloom_filter
487            .as_ref()
488            .map(|bf| bf.estimated_memory_size())
489            .unwrap_or_default();
490
491        let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
492            + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
493
494        encoder_size + bloom_filter_size + stats_size
495    }
496
497    fn estimated_dict_page_size(&self) -> Option<usize> {
498        Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
499    }
500
501    /// Returns an estimate of the data page size in bytes
502    ///
503    /// This includes:
504    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
505    fn estimated_data_page_size(&self) -> usize {
506        match &self.dict_encoder {
507            Some(encoder) => encoder.estimated_data_page_size(),
508            None => self.fallback.estimated_data_page_size(),
509        }
510    }
511
512    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
513        match self.dict_encoder.take() {
514            Some(encoder) => {
515                if !encoder.indices.is_empty() {
516                    return Err(general_err!(
517                        "Must flush data pages before flushing dictionary"
518                    ));
519                }
520
521                Ok(Some(encoder.flush_dict_page()))
522            }
523            _ => Ok(None),
524        }
525    }
526
527    fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
528        let min_value = self.min_value.take();
529        let max_value = self.max_value.take();
530
531        match &mut self.dict_encoder {
532            Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
533            _ => self.fallback.flush_data_page(min_value, max_value),
534        }
535    }
536}
537
538/// Encodes the provided `values` and `indices` to `encoder`
539///
540/// This is a free function so it can be used with `downcast_op!`
541fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
542where
543    T: ArrayAccessor + Copy,
544    T::Item: Copy + Ord + AsRef<[u8]>,
545{
546    if encoder.statistics_enabled != EnabledStatistics::None {
547        if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
548            if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
549                encoder.min_value = Some(min);
550            }
551
552            if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
553                encoder.max_value = Some(max);
554            }
555        }
556    }
557
558    // encode the values into bloom filter if enabled
559    if let Some(bloom_filter) = &mut encoder.bloom_filter {
560        let valid = indices.iter().cloned();
561        for idx in valid {
562            bloom_filter.insert(values.value(idx).as_ref());
563        }
564    }
565
566    match &mut encoder.dict_encoder {
567        Some(dict_encoder) => dict_encoder.encode(values, indices),
568        None => encoder.fallback.encode(values, indices),
569    }
570}
571
572/// Computes the min and max for the provided array and indices
573///
574/// This is a free function so it can be used with `downcast_op!`
575fn compute_min_max<T>(
576    array: T,
577    mut valid: impl Iterator<Item = usize>,
578) -> Option<(ByteArray, ByteArray)>
579where
580    T: ArrayAccessor,
581    T::Item: Copy + Ord + AsRef<[u8]>,
582{
583    let first_idx = valid.next()?;
584
585    let first_val = array.value(first_idx);
586    let mut min = first_val;
587    let mut max = first_val;
588    for idx in valid {
589        let val = array.value(idx);
590        min = min.min(val);
591        max = max.max(val);
592    }
593    Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
594}