Skip to main content

parquet/arrow/arrow_writer/
byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{
21    ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
22};
23use crate::data_type::{AsBytes, ByteArray, Int32Type};
24use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
25use crate::encodings::rle::RleEncoder;
26use crate::errors::{ParquetError, Result};
27use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
28use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
29use crate::geospatial::statistics::GeospatialStatistics;
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::num_required_bits;
32use crate::util::interner::{Interner, Storage};
33use arrow_array::{
34    Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
35    LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
36};
37use arrow_schema::DataType;
38
39macro_rules! downcast_dict_impl {
40    ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
41        $op($array
42            .as_any()
43            .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
44            .unwrap()
45            .downcast_dict::<$val>()
46            .unwrap()$(, $arg)*)
47    }};
48}
49
50macro_rules! downcast_dict_op {
51    ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
52        match $key_type.as_ref() {
53            DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
54            DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
55            DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
56            DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
57            DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
58            DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
59            DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
60            DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
61            _ => unreachable!(),
62        }
63    };
64}
65
66macro_rules! downcast_op {
67    ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
68        match $data_type {
69            DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
70            DataType::LargeUtf8 => {
71                $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
72            }
73            DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
74            DataType::Binary => {
75                $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
76            }
77            DataType::LargeBinary => {
78                $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
79            }
80            DataType::BinaryView => {
81                $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
82            }
83            DataType::Dictionary(key, value) => match value.as_ref() {
84                DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
85                DataType::LargeUtf8 => {
86                    downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
87                }
88                DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
89                DataType::LargeBinary => {
90                    downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
91                }
92                DataType::FixedSizeBinary(_) => {
93                    downcast_dict_op!(key, FixedSizeBinaryArray, $array, $op$(, $arg)*)
94                }
95                d => unreachable!("cannot downcast {} dictionary value to byte array", d),
96            },
97            d => unreachable!("cannot downcast {} to byte array", d),
98        }
99    };
100}
101
102/// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
103struct FallbackEncoder {
104    encoder: FallbackEncoderImpl,
105    num_values: usize,
106    variable_length_bytes: i64,
107}
108
109/// The fallback encoder in use
110///
111/// Note: DeltaBitPackEncoder is boxed as it is rather large
112enum FallbackEncoderImpl {
113    Plain {
114        buffer: Vec<u8>,
115    },
116    DeltaLength {
117        buffer: Vec<u8>,
118        lengths: Box<DeltaBitPackEncoder<Int32Type>>,
119    },
120    Delta {
121        buffer: Vec<u8>,
122        last_value: Vec<u8>,
123        prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
124        suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
125    },
126}
127
128impl FallbackEncoder {
129    /// Create the fallback encoder for the given [`ColumnDescPtr`] and [`WriterProperties`]
130    fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
131        // Set either main encoder or fallback encoder.
132        let encoding =
133            props
134                .encoding(descr.path())
135                .unwrap_or_else(|| match props.writer_version() {
136                    WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
137                    WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
138                });
139
140        let encoder = match encoding {
141            Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
142            Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
143                buffer: vec![],
144                lengths: Box::new(DeltaBitPackEncoder::new()),
145            },
146            Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
147                buffer: vec![],
148                last_value: vec![],
149                prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
150                suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
151            },
152            _ => {
153                return Err(general_err!(
154                    "unsupported encoding {} for byte array",
155                    encoding
156                ));
157            }
158        };
159
160        Ok(Self {
161            encoder,
162            num_values: 0,
163            variable_length_bytes: 0,
164        })
165    }
166
167    /// Encode `values` to the in-progress page
168    fn encode<T>(&mut self, values: T, indices: &[usize])
169    where
170        T: ArrayAccessor + Copy,
171        T::Item: AsRef<[u8]>,
172    {
173        self.num_values += indices.len();
174        match &mut self.encoder {
175            FallbackEncoderImpl::Plain { buffer } => {
176                for idx in indices {
177                    let value = values.value(*idx);
178                    let value = value.as_ref();
179                    buffer.extend_from_slice((value.len() as u32).as_bytes());
180                    buffer.extend_from_slice(value);
181                    self.variable_length_bytes += value.len() as i64;
182                }
183            }
184            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
185                for idx in indices {
186                    let value = values.value(*idx);
187                    let value = value.as_ref();
188                    lengths.put(&[value.len() as i32]).unwrap();
189                    buffer.extend_from_slice(value);
190                    self.variable_length_bytes += value.len() as i64;
191                }
192            }
193            FallbackEncoderImpl::Delta {
194                buffer,
195                last_value,
196                prefix_lengths,
197                suffix_lengths,
198            } => {
199                for idx in indices {
200                    let value = values.value(*idx);
201                    let value = value.as_ref();
202                    let mut prefix_length = 0;
203
204                    while prefix_length < last_value.len()
205                        && prefix_length < value.len()
206                        && last_value[prefix_length] == value[prefix_length]
207                    {
208                        prefix_length += 1;
209                    }
210
211                    let suffix_length = value.len() - prefix_length;
212
213                    last_value.clear();
214                    last_value.extend_from_slice(value);
215
216                    buffer.extend_from_slice(&value[prefix_length..]);
217                    prefix_lengths.put(&[prefix_length as i32]).unwrap();
218                    suffix_lengths.put(&[suffix_length as i32]).unwrap();
219                    self.variable_length_bytes += value.len() as i64;
220                }
221            }
222        }
223    }
224
225    /// Returns an estimate of the data page size in bytes
226    ///
227    /// This includes:
228    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
229    fn estimated_data_page_size(&self) -> usize {
230        match &self.encoder {
231            FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
232            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
233                buffer.len() + lengths.estimated_data_encoded_size()
234            }
235            FallbackEncoderImpl::Delta {
236                buffer,
237                prefix_lengths,
238                suffix_lengths,
239                ..
240            } => {
241                buffer.len()
242                    + prefix_lengths.estimated_data_encoded_size()
243                    + suffix_lengths.estimated_data_encoded_size()
244            }
245        }
246    }
247
248    fn flush_data_page(
249        &mut self,
250        min_value: Option<ByteArray>,
251        max_value: Option<ByteArray>,
252    ) -> Result<DataPageValues<ByteArray>> {
253        let (buf, encoding) = match &mut self.encoder {
254            FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
255            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
256                let lengths = lengths.flush_buffer()?;
257
258                let mut out = Vec::with_capacity(lengths.len() + buffer.len());
259                out.extend_from_slice(&lengths);
260                out.extend_from_slice(buffer);
261                buffer.clear();
262                (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
263            }
264            FallbackEncoderImpl::Delta {
265                buffer,
266                prefix_lengths,
267                suffix_lengths,
268                last_value,
269            } => {
270                let prefix_lengths = prefix_lengths.flush_buffer()?;
271                let suffix_lengths = suffix_lengths.flush_buffer()?;
272
273                let mut out =
274                    Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
275                out.extend_from_slice(&prefix_lengths);
276                out.extend_from_slice(&suffix_lengths);
277                out.extend_from_slice(buffer);
278                buffer.clear();
279                last_value.clear();
280                (out, Encoding::DELTA_BYTE_ARRAY)
281            }
282        };
283
284        // Capture value of variable_length_bytes and reset for next page
285        let variable_length_bytes = Some(self.variable_length_bytes);
286        self.variable_length_bytes = 0;
287
288        Ok(DataPageValues {
289            buf: buf.into(),
290            num_values: std::mem::take(&mut self.num_values),
291            encoding,
292            min_value,
293            max_value,
294            variable_length_bytes,
295        })
296    }
297}
298
299/// [`Storage`] for the [`Interner`] used by [`DictEncoder`]
300#[derive(Debug, Default)]
301struct ByteArrayStorage {
302    /// Encoded dictionary data
303    page: Vec<u8>,
304
305    values: Vec<std::ops::Range<usize>>,
306}
307
308impl Storage for ByteArrayStorage {
309    type Key = u64;
310    type Value = [u8];
311
312    fn get(&self, idx: Self::Key) -> &Self::Value {
313        &self.page[self.values[idx as usize].clone()]
314    }
315
316    fn push(&mut self, value: &Self::Value) -> Self::Key {
317        let key = self.values.len();
318
319        self.page.reserve(4 + value.len());
320        self.page.extend_from_slice((value.len() as u32).as_bytes());
321
322        let start = self.page.len();
323        self.page.extend_from_slice(value);
324        self.values.push(start..self.page.len());
325
326        key as u64
327    }
328
329    #[allow(dead_code)] // not used in parquet_derive, so is dead there
330    fn estimated_memory_size(&self) -> usize {
331        self.page.capacity() * std::mem::size_of::<u8>()
332            + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
333    }
334}
335
336/// A dictionary encoder for byte array data
337#[derive(Debug, Default)]
338struct DictEncoder {
339    interner: Interner<ByteArrayStorage>,
340    indices: Vec<u64>,
341    variable_length_bytes: i64,
342}
343
344impl DictEncoder {
345    /// Encode `values` to the in-progress page
346    fn encode<T>(&mut self, values: T, indices: &[usize])
347    where
348        T: ArrayAccessor + Copy,
349        T::Item: AsRef<[u8]>,
350    {
351        self.indices.reserve(indices.len());
352
353        for idx in indices {
354            let value = values.value(*idx);
355            let interned = self.interner.intern(value.as_ref());
356            self.indices.push(interned);
357            self.variable_length_bytes += value.as_ref().len() as i64;
358        }
359    }
360
361    fn bit_width(&self) -> u8 {
362        let length = self.interner.storage().values.len();
363        num_required_bits(length.saturating_sub(1) as u64)
364    }
365
366    fn estimated_memory_size(&self) -> usize {
367        self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
368    }
369
370    fn estimated_data_page_size(&self) -> usize {
371        let bit_width = self.bit_width();
372        1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
373    }
374
375    fn estimated_dict_page_size(&self) -> usize {
376        self.interner.storage().page.len()
377    }
378
379    fn flush_dict_page(self) -> DictionaryPage {
380        let storage = self.interner.into_inner();
381
382        DictionaryPage {
383            buf: storage.page.into(),
384            num_values: storage.values.len(),
385            is_sorted: false,
386        }
387    }
388
389    fn flush_data_page(
390        &mut self,
391        min_value: Option<ByteArray>,
392        max_value: Option<ByteArray>,
393    ) -> DataPageValues<ByteArray> {
394        let num_values = self.indices.len();
395        let buffer_len = self.estimated_data_page_size();
396        let mut buffer = Vec::with_capacity(buffer_len);
397        buffer.push(self.bit_width());
398
399        let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
400        for index in &self.indices {
401            encoder.put(*index)
402        }
403
404        self.indices.clear();
405
406        // Capture value of variable_length_bytes and reset for next page
407        let variable_length_bytes = Some(self.variable_length_bytes);
408        self.variable_length_bytes = 0;
409
410        DataPageValues {
411            buf: encoder.consume().into(),
412            num_values,
413            encoding: Encoding::RLE_DICTIONARY,
414            min_value,
415            max_value,
416            variable_length_bytes,
417        }
418    }
419}
420
421pub struct ByteArrayEncoder {
422    fallback: FallbackEncoder,
423    dict_encoder: Option<DictEncoder>,
424    statistics_enabled: EnabledStatistics,
425    min_value: Option<ByteArray>,
426    max_value: Option<ByteArray>,
427    bloom_filter: Option<Sbbf>,
428    bloom_filter_target_fpp: f64,
429    geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
430}
431
432impl ColumnValueEncoder for ByteArrayEncoder {
433    type T = ByteArray;
434    type Values = dyn Array;
435    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
436        let mut sbbf = self.bloom_filter.take()?;
437        sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
438        Some(sbbf)
439    }
440
441    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
442    where
443        Self: Sized,
444    {
445        let dictionary = props
446            .dictionary_enabled(descr.path())
447            .then(DictEncoder::default);
448
449        let fallback = FallbackEncoder::new(descr, props)?;
450
451        let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
452
453        let statistics_enabled = props.statistics_enabled(descr.path());
454
455        let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
456
457        Ok(Self {
458            fallback,
459            statistics_enabled,
460            bloom_filter,
461            bloom_filter_target_fpp,
462            dict_encoder: dictionary,
463            min_value: None,
464            max_value: None,
465            geo_stats_accumulator,
466        })
467    }
468
469    fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
470        unreachable!("should call write_gather instead")
471    }
472
473    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
474        downcast_op!(values.data_type(), values, encode, indices, self);
475        Ok(())
476    }
477
478    fn num_values(&self) -> usize {
479        match &self.dict_encoder {
480            Some(encoder) => encoder.indices.len(),
481            None => self.fallback.num_values,
482        }
483    }
484
485    fn has_dictionary(&self) -> bool {
486        self.dict_encoder.is_some()
487    }
488
489    fn estimated_memory_size(&self) -> usize {
490        let encoder_size = match &self.dict_encoder {
491            Some(encoder) => encoder.estimated_memory_size(),
492            // For the FallbackEncoder, these unflushed bytes are already encoded.
493            // Therefore, the size should be the same as estimated_data_page_size.
494            None => self.fallback.estimated_data_page_size(),
495        };
496
497        let bloom_filter_size = self
498            .bloom_filter
499            .as_ref()
500            .map(|bf| bf.estimated_memory_size())
501            .unwrap_or_default();
502
503        let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
504            + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
505
506        encoder_size + bloom_filter_size + stats_size
507    }
508
509    fn estimated_dict_page_size(&self) -> Option<usize> {
510        Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
511    }
512
513    /// Returns an estimate of the data page size in bytes
514    ///
515    /// This includes:
516    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
517    fn estimated_data_page_size(&self) -> usize {
518        match &self.dict_encoder {
519            Some(encoder) => encoder.estimated_data_page_size(),
520            None => self.fallback.estimated_data_page_size(),
521        }
522    }
523
524    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
525        match self.dict_encoder.take() {
526            Some(encoder) => {
527                if !encoder.indices.is_empty() {
528                    return Err(general_err!(
529                        "Must flush data pages before flushing dictionary"
530                    ));
531                }
532
533                Ok(Some(encoder.flush_dict_page()))
534            }
535            _ => Ok(None),
536        }
537    }
538
539    fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
540        let min_value = self.min_value.take();
541        let max_value = self.max_value.take();
542
543        match &mut self.dict_encoder {
544            Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
545            _ => self.fallback.flush_data_page(min_value, max_value),
546        }
547    }
548
549    fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
550        self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
551    }
552}
553
554/// Encodes the provided `values` and `indices` to `encoder`
555///
556/// This is a free function so it can be used with `downcast_op!`
557fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
558where
559    T: ArrayAccessor + Copy,
560    T::Item: Copy + Ord + AsRef<[u8]>,
561{
562    if encoder.statistics_enabled != EnabledStatistics::None {
563        if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
564            update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned());
565        } else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
566            if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
567                encoder.min_value = Some(min);
568            }
569
570            if encoder.max_value.as_ref().is_none_or(|m| m < &max) {
571                encoder.max_value = Some(max);
572            }
573        }
574    }
575
576    // encode the values into bloom filter if enabled
577    if let Some(bloom_filter) = &mut encoder.bloom_filter {
578        let valid = indices.iter().cloned();
579        for idx in valid {
580            bloom_filter.insert(values.value(idx).as_ref());
581        }
582    }
583
584    match &mut encoder.dict_encoder {
585        Some(dict_encoder) => dict_encoder.encode(values, indices),
586        None => encoder.fallback.encode(values, indices),
587    }
588}
589
590/// Computes the min and max for the provided array and indices
591///
592/// This is a free function so it can be used with `downcast_op!`
593fn compute_min_max<T>(
594    array: T,
595    mut valid: impl Iterator<Item = usize>,
596) -> Option<(ByteArray, ByteArray)>
597where
598    T: ArrayAccessor,
599    T::Item: Copy + Ord + AsRef<[u8]>,
600{
601    let first_idx = valid.next()?;
602
603    let first_val = array.value(first_idx);
604    let mut min = first_val;
605    let mut max = first_val;
606    for idx in valid {
607        let val = array.value(idx);
608        min = min.min(val);
609        max = max.max(val);
610    }
611    Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
612}
613
614/// Updates geospatial statistics for the provided array and indices
615fn update_geo_stats_accumulator<T>(
616    bounder: &mut dyn GeoStatsAccumulator,
617    array: T,
618    valid: impl Iterator<Item = usize>,
619) where
620    T: ArrayAccessor,
621    T::Item: Copy + Ord + AsRef<[u8]>,
622{
623    if bounder.is_valid() {
624        for idx in valid {
625            let val = array.value(idx);
626            bounder.update_wkb(val.as_ref());
627        }
628    }
629}