parquet/arrow/arrow_writer/
byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
21use crate::data_type::{AsBytes, ByteArray, Int32Type};
22use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
23use crate::encodings::rle::RleEncoder;
24use crate::errors::{ParquetError, Result};
25use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
26use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
27use crate::geospatial::statistics::GeospatialStatistics;
28use crate::schema::types::ColumnDescPtr;
29use crate::util::bit_util::num_required_bits;
30use crate::util::interner::{Interner, Storage};
31use arrow_array::{
32    Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
33    LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
34};
35use arrow_schema::DataType;
36
37macro_rules! downcast_dict_impl {
38    ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
39        $op($array
40            .as_any()
41            .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
42            .unwrap()
43            .downcast_dict::<$val>()
44            .unwrap()$(, $arg)*)
45    }};
46}
47
48macro_rules! downcast_dict_op {
49    ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
50        match $key_type.as_ref() {
51            DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
52            DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
53            DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
54            DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
55            DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
56            DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
57            DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
58            DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
59            _ => unreachable!(),
60        }
61    };
62}
63
64macro_rules! downcast_op {
65    ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
66        match $data_type {
67            DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
68            DataType::LargeUtf8 => {
69                $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
70            }
71            DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
72            DataType::Binary => {
73                $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
74            }
75            DataType::LargeBinary => {
76                $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
77            }
78            DataType::BinaryView => {
79                $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
80            }
81            DataType::Dictionary(key, value) => match value.as_ref() {
82                DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
83                DataType::LargeUtf8 => {
84                    downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
85                }
86                DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
87                DataType::LargeBinary => {
88                    downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
89                }
90                DataType::FixedSizeBinary(_) => {
91                    downcast_dict_op!(key, FixedSizeBinaryArray, $array, $op$(, $arg)*)
92                }
93                d => unreachable!("cannot downcast {} dictionary value to byte array", d),
94            },
95            d => unreachable!("cannot downcast {} to byte array", d),
96        }
97    };
98}
99
100/// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
101struct FallbackEncoder {
102    encoder: FallbackEncoderImpl,
103    num_values: usize,
104    variable_length_bytes: i64,
105}
106
107/// The fallback encoder in use
108///
109/// Note: DeltaBitPackEncoder is boxed as it is rather large
110enum FallbackEncoderImpl {
111    Plain {
112        buffer: Vec<u8>,
113    },
114    DeltaLength {
115        buffer: Vec<u8>,
116        lengths: Box<DeltaBitPackEncoder<Int32Type>>,
117    },
118    Delta {
119        buffer: Vec<u8>,
120        last_value: Vec<u8>,
121        prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
122        suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
123    },
124}
125
126impl FallbackEncoder {
127    /// Create the fallback encoder for the given [`ColumnDescPtr`] and [`WriterProperties`]
128    fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
129        // Set either main encoder or fallback encoder.
130        let encoding =
131            props
132                .encoding(descr.path())
133                .unwrap_or_else(|| match props.writer_version() {
134                    WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
135                    WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
136                });
137
138        let encoder = match encoding {
139            Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
140            Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
141                buffer: vec![],
142                lengths: Box::new(DeltaBitPackEncoder::new()),
143            },
144            Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
145                buffer: vec![],
146                last_value: vec![],
147                prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
148                suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
149            },
150            _ => {
151                return Err(general_err!(
152                    "unsupported encoding {} for byte array",
153                    encoding
154                ));
155            }
156        };
157
158        Ok(Self {
159            encoder,
160            num_values: 0,
161            variable_length_bytes: 0,
162        })
163    }
164
165    /// Encode `values` to the in-progress page
166    fn encode<T>(&mut self, values: T, indices: &[usize])
167    where
168        T: ArrayAccessor + Copy,
169        T::Item: AsRef<[u8]>,
170    {
171        self.num_values += indices.len();
172        match &mut self.encoder {
173            FallbackEncoderImpl::Plain { buffer } => {
174                for idx in indices {
175                    let value = values.value(*idx);
176                    let value = value.as_ref();
177                    buffer.extend_from_slice((value.len() as u32).as_bytes());
178                    buffer.extend_from_slice(value);
179                    self.variable_length_bytes += value.len() as i64;
180                }
181            }
182            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
183                for idx in indices {
184                    let value = values.value(*idx);
185                    let value = value.as_ref();
186                    lengths.put(&[value.len() as i32]).unwrap();
187                    buffer.extend_from_slice(value);
188                    self.variable_length_bytes += value.len() as i64;
189                }
190            }
191            FallbackEncoderImpl::Delta {
192                buffer,
193                last_value,
194                prefix_lengths,
195                suffix_lengths,
196            } => {
197                for idx in indices {
198                    let value = values.value(*idx);
199                    let value = value.as_ref();
200                    let mut prefix_length = 0;
201
202                    while prefix_length < last_value.len()
203                        && prefix_length < value.len()
204                        && last_value[prefix_length] == value[prefix_length]
205                    {
206                        prefix_length += 1;
207                    }
208
209                    let suffix_length = value.len() - prefix_length;
210
211                    last_value.clear();
212                    last_value.extend_from_slice(value);
213
214                    buffer.extend_from_slice(&value[prefix_length..]);
215                    prefix_lengths.put(&[prefix_length as i32]).unwrap();
216                    suffix_lengths.put(&[suffix_length as i32]).unwrap();
217                    self.variable_length_bytes += value.len() as i64;
218                }
219            }
220        }
221    }
222
223    /// Returns an estimate of the data page size in bytes
224    ///
225    /// This includes:
226    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
227    fn estimated_data_page_size(&self) -> usize {
228        match &self.encoder {
229            FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
230            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
231                buffer.len() + lengths.estimated_data_encoded_size()
232            }
233            FallbackEncoderImpl::Delta {
234                buffer,
235                prefix_lengths,
236                suffix_lengths,
237                ..
238            } => {
239                buffer.len()
240                    + prefix_lengths.estimated_data_encoded_size()
241                    + suffix_lengths.estimated_data_encoded_size()
242            }
243        }
244    }
245
246    fn flush_data_page(
247        &mut self,
248        min_value: Option<ByteArray>,
249        max_value: Option<ByteArray>,
250    ) -> Result<DataPageValues<ByteArray>> {
251        let (buf, encoding) = match &mut self.encoder {
252            FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
253            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
254                let lengths = lengths.flush_buffer()?;
255
256                let mut out = Vec::with_capacity(lengths.len() + buffer.len());
257                out.extend_from_slice(&lengths);
258                out.extend_from_slice(buffer);
259                buffer.clear();
260                (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
261            }
262            FallbackEncoderImpl::Delta {
263                buffer,
264                prefix_lengths,
265                suffix_lengths,
266                last_value,
267            } => {
268                let prefix_lengths = prefix_lengths.flush_buffer()?;
269                let suffix_lengths = suffix_lengths.flush_buffer()?;
270
271                let mut out =
272                    Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
273                out.extend_from_slice(&prefix_lengths);
274                out.extend_from_slice(&suffix_lengths);
275                out.extend_from_slice(buffer);
276                buffer.clear();
277                last_value.clear();
278                (out, Encoding::DELTA_BYTE_ARRAY)
279            }
280        };
281
282        // Capture value of variable_length_bytes and reset for next page
283        let variable_length_bytes = Some(self.variable_length_bytes);
284        self.variable_length_bytes = 0;
285
286        Ok(DataPageValues {
287            buf: buf.into(),
288            num_values: std::mem::take(&mut self.num_values),
289            encoding,
290            min_value,
291            max_value,
292            variable_length_bytes,
293        })
294    }
295}
296
297/// [`Storage`] for the [`Interner`] used by [`DictEncoder`]
298#[derive(Debug, Default)]
299struct ByteArrayStorage {
300    /// Encoded dictionary data
301    page: Vec<u8>,
302
303    values: Vec<std::ops::Range<usize>>,
304}
305
306impl Storage for ByteArrayStorage {
307    type Key = u64;
308    type Value = [u8];
309
310    fn get(&self, idx: Self::Key) -> &Self::Value {
311        &self.page[self.values[idx as usize].clone()]
312    }
313
314    fn push(&mut self, value: &Self::Value) -> Self::Key {
315        let key = self.values.len();
316
317        self.page.reserve(4 + value.len());
318        self.page.extend_from_slice((value.len() as u32).as_bytes());
319
320        let start = self.page.len();
321        self.page.extend_from_slice(value);
322        self.values.push(start..self.page.len());
323
324        key as u64
325    }
326
327    #[allow(dead_code)] // not used in parquet_derive, so is dead there
328    fn estimated_memory_size(&self) -> usize {
329        self.page.capacity() * std::mem::size_of::<u8>()
330            + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
331    }
332}
333
334/// A dictionary encoder for byte array data
335#[derive(Debug, Default)]
336struct DictEncoder {
337    interner: Interner<ByteArrayStorage>,
338    indices: Vec<u64>,
339    variable_length_bytes: i64,
340}
341
342impl DictEncoder {
343    /// Encode `values` to the in-progress page
344    fn encode<T>(&mut self, values: T, indices: &[usize])
345    where
346        T: ArrayAccessor + Copy,
347        T::Item: AsRef<[u8]>,
348    {
349        self.indices.reserve(indices.len());
350
351        for idx in indices {
352            let value = values.value(*idx);
353            let interned = self.interner.intern(value.as_ref());
354            self.indices.push(interned);
355            self.variable_length_bytes += value.as_ref().len() as i64;
356        }
357    }
358
359    fn bit_width(&self) -> u8 {
360        let length = self.interner.storage().values.len();
361        num_required_bits(length.saturating_sub(1) as u64)
362    }
363
364    fn estimated_memory_size(&self) -> usize {
365        self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
366    }
367
368    fn estimated_data_page_size(&self) -> usize {
369        let bit_width = self.bit_width();
370        1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
371    }
372
373    fn estimated_dict_page_size(&self) -> usize {
374        self.interner.storage().page.len()
375    }
376
377    fn flush_dict_page(self) -> DictionaryPage {
378        let storage = self.interner.into_inner();
379
380        DictionaryPage {
381            buf: storage.page.into(),
382            num_values: storage.values.len(),
383            is_sorted: false,
384        }
385    }
386
387    fn flush_data_page(
388        &mut self,
389        min_value: Option<ByteArray>,
390        max_value: Option<ByteArray>,
391    ) -> DataPageValues<ByteArray> {
392        let num_values = self.indices.len();
393        let buffer_len = self.estimated_data_page_size();
394        let mut buffer = Vec::with_capacity(buffer_len);
395        buffer.push(self.bit_width());
396
397        let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
398        for index in &self.indices {
399            encoder.put(*index)
400        }
401
402        self.indices.clear();
403
404        // Capture value of variable_length_bytes and reset for next page
405        let variable_length_bytes = Some(self.variable_length_bytes);
406        self.variable_length_bytes = 0;
407
408        DataPageValues {
409            buf: encoder.consume().into(),
410            num_values,
411            encoding: Encoding::RLE_DICTIONARY,
412            min_value,
413            max_value,
414            variable_length_bytes,
415        }
416    }
417}
418
419pub struct ByteArrayEncoder {
420    fallback: FallbackEncoder,
421    dict_encoder: Option<DictEncoder>,
422    statistics_enabled: EnabledStatistics,
423    min_value: Option<ByteArray>,
424    max_value: Option<ByteArray>,
425    bloom_filter: Option<Sbbf>,
426    geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
427}
428
429impl ColumnValueEncoder for ByteArrayEncoder {
430    type T = ByteArray;
431    type Values = dyn Array;
432    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
433        self.bloom_filter.take()
434    }
435
436    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
437    where
438        Self: Sized,
439    {
440        let dictionary = props
441            .dictionary_enabled(descr.path())
442            .then(DictEncoder::default);
443
444        let fallback = FallbackEncoder::new(descr, props)?;
445
446        let bloom_filter = props
447            .bloom_filter_properties(descr.path())
448            .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
449            .transpose()?;
450
451        let statistics_enabled = props.statistics_enabled(descr.path());
452
453        let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
454
455        Ok(Self {
456            fallback,
457            statistics_enabled,
458            bloom_filter,
459            dict_encoder: dictionary,
460            min_value: None,
461            max_value: None,
462            geo_stats_accumulator,
463        })
464    }
465
466    fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
467        unreachable!("should call write_gather instead")
468    }
469
470    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
471        downcast_op!(values.data_type(), values, encode, indices, self);
472        Ok(())
473    }
474
475    fn num_values(&self) -> usize {
476        match &self.dict_encoder {
477            Some(encoder) => encoder.indices.len(),
478            None => self.fallback.num_values,
479        }
480    }
481
482    fn has_dictionary(&self) -> bool {
483        self.dict_encoder.is_some()
484    }
485
486    fn estimated_memory_size(&self) -> usize {
487        let encoder_size = match &self.dict_encoder {
488            Some(encoder) => encoder.estimated_memory_size(),
489            // For the FallbackEncoder, these unflushed bytes are already encoded.
490            // Therefore, the size should be the same as estimated_data_page_size.
491            None => self.fallback.estimated_data_page_size(),
492        };
493
494        let bloom_filter_size = self
495            .bloom_filter
496            .as_ref()
497            .map(|bf| bf.estimated_memory_size())
498            .unwrap_or_default();
499
500        let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
501            + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
502
503        encoder_size + bloom_filter_size + stats_size
504    }
505
506    fn estimated_dict_page_size(&self) -> Option<usize> {
507        Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
508    }
509
510    /// Returns an estimate of the data page size in bytes
511    ///
512    /// This includes:
513    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
514    fn estimated_data_page_size(&self) -> usize {
515        match &self.dict_encoder {
516            Some(encoder) => encoder.estimated_data_page_size(),
517            None => self.fallback.estimated_data_page_size(),
518        }
519    }
520
521    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
522        match self.dict_encoder.take() {
523            Some(encoder) => {
524                if !encoder.indices.is_empty() {
525                    return Err(general_err!(
526                        "Must flush data pages before flushing dictionary"
527                    ));
528                }
529
530                Ok(Some(encoder.flush_dict_page()))
531            }
532            _ => Ok(None),
533        }
534    }
535
536    fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
537        let min_value = self.min_value.take();
538        let max_value = self.max_value.take();
539
540        match &mut self.dict_encoder {
541            Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
542            _ => self.fallback.flush_data_page(min_value, max_value),
543        }
544    }
545
546    fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
547        self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
548    }
549}
550
551/// Encodes the provided `values` and `indices` to `encoder`
552///
553/// This is a free function so it can be used with `downcast_op!`
554fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
555where
556    T: ArrayAccessor + Copy,
557    T::Item: Copy + Ord + AsRef<[u8]>,
558{
559    if encoder.statistics_enabled != EnabledStatistics::None {
560        if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
561            update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned());
562        } else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
563            if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
564                encoder.min_value = Some(min);
565            }
566
567            if encoder.max_value.as_ref().is_none_or(|m| m < &max) {
568                encoder.max_value = Some(max);
569            }
570        }
571    }
572
573    // encode the values into bloom filter if enabled
574    if let Some(bloom_filter) = &mut encoder.bloom_filter {
575        let valid = indices.iter().cloned();
576        for idx in valid {
577            bloom_filter.insert(values.value(idx).as_ref());
578        }
579    }
580
581    match &mut encoder.dict_encoder {
582        Some(dict_encoder) => dict_encoder.encode(values, indices),
583        None => encoder.fallback.encode(values, indices),
584    }
585}
586
587/// Computes the min and max for the provided array and indices
588///
589/// This is a free function so it can be used with `downcast_op!`
590fn compute_min_max<T>(
591    array: T,
592    mut valid: impl Iterator<Item = usize>,
593) -> Option<(ByteArray, ByteArray)>
594where
595    T: ArrayAccessor,
596    T::Item: Copy + Ord + AsRef<[u8]>,
597{
598    let first_idx = valid.next()?;
599
600    let first_val = array.value(first_idx);
601    let mut min = first_val;
602    let mut max = first_val;
603    for idx in valid {
604        let val = array.value(idx);
605        min = min.min(val);
606        max = max.max(val);
607    }
608    Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
609}
610
611/// Updates geospatial statistics for the provided array and indices
612fn update_geo_stats_accumulator<T>(
613    bounder: &mut dyn GeoStatsAccumulator,
614    array: T,
615    valid: impl Iterator<Item = usize>,
616) where
617    T: ArrayAccessor,
618    T::Item: Copy + Ord + AsRef<[u8]>,
619{
620    if bounder.is_valid() {
621        for idx in valid {
622            let val = array.value(idx);
623            bounder.update_wkb(val.as_ref());
624        }
625    }
626}