Skip to main content

parquet/column/writer/
encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19use half::f16;
20
21use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::writer::{
24    compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
25};
26use crate::data_type::DataType;
27use crate::data_type::private::ParquetValueType;
28use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder};
29use crate::errors::{ParquetError, Result};
30use crate::file::properties::{EnabledStatistics, WriterProperties};
31use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
32use crate::geospatial::statistics::GeospatialStatistics;
33use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
34
35/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
36pub trait ColumnValues {
37    /// The number of values in this collection
38    fn len(&self) -> usize;
39}
40
41#[cfg(feature = "arrow")]
42impl ColumnValues for dyn arrow_array::Array {
43    fn len(&self) -> usize {
44        arrow_array::Array::len(self)
45    }
46}
47
48impl<T: ParquetValueType> ColumnValues for [T] {
49    fn len(&self) -> usize {
50        self.len()
51    }
52}
53
54/// The encoded data for a dictionary page
55pub struct DictionaryPage {
56    pub buf: Bytes,
57    pub num_values: usize,
58    pub is_sorted: bool,
59}
60
61/// The encoded values for a data page, with optional statistics
62pub struct DataPageValues<T> {
63    pub buf: Bytes,
64    pub num_values: usize,
65    pub encoding: Encoding,
66    pub min_value: Option<T>,
67    pub max_value: Option<T>,
68    pub variable_length_bytes: Option<i64>,
69}
70
71/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
72/// [super::GenericColumnWriter`]
73pub trait ColumnValueEncoder {
74    /// The underlying value type of [`Self::Values`]
75    ///
76    /// Note: this avoids needing to fully qualify `<Self::Values as ColumnValues>::T`
77    type T: ParquetValueType;
78
79    /// The values encoded by this encoder
80    type Values: ColumnValues + ?Sized;
81
82    /// Create a new [`ColumnValueEncoder`]
83    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
84    where
85        Self: Sized;
86
87    /// Write the corresponding values to this [`ColumnValueEncoder`]
88    fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>;
89
90    /// Write the values at the indexes in `indices` to this [`ColumnValueEncoder`]
91    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>;
92
93    /// Returns the largest `k` such that the first `k` values in
94    /// `values[offset..offset + len]` encode to at most `byte_budget`
95    /// bytes — i.e. how many values fit in a single page byte budget.
96    ///
97    /// Returns `len` if every value fits. Returns at least 1 if a single
98    /// value alone exceeds the budget, matching parquet's "at least one
99    /// value per data page" rule.
100    ///
101    /// `None` means "no cheap estimate available"; the caller stays on
102    /// the batched fast path and lets the post-write
103    /// `should_add_data_page` check handle bounding.
104    ///
105    /// Implementations should short-circuit aggressively: the typical
106    /// case is "everything fits, return `len`", and the next-most-common
107    /// case is "one wide value, return 1." The variable-width walk only
108    /// needs to be precise when the chunk is genuinely near the budget.
109    fn count_values_within_byte_budget(
110        _values: &Self::Values,
111        _offset: usize,
112        _len: usize,
113        _byte_budget: usize,
114    ) -> Option<usize> {
115        None
116    }
117
118    /// As [`Self::count_values_within_byte_budget`] but using gather
119    /// `indices` rather than a contiguous range. Returns the number of
120    /// `indices` that fit, not the maximum index value.
121    fn count_values_within_byte_budget_gather(
122        _values: &Self::Values,
123        _indices: &[usize],
124        _byte_budget: usize,
125    ) -> Option<usize> {
126        None
127    }
128
129    /// Returns the number of buffered values
130    fn num_values(&self) -> usize;
131
132    /// Returns true if this encoder has a dictionary page
133    fn has_dictionary(&self) -> bool;
134
135    /// Returns the estimated total memory usage of the encoder
136    ///
137    fn estimated_memory_size(&self) -> usize;
138
139    /// Returns an estimate of the encoded size of dictionary page size in bytes, or `None` if no dictionary
140    fn estimated_dict_page_size(&self) -> Option<usize>;
141
142    /// Returns an estimate of the encoded data page size in bytes
143    ///
144    /// This should include:
145    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
146    fn estimated_data_page_size(&self) -> usize;
147
148    /// Flush the dictionary page for this column chunk if any. Any subsequent calls to
149    /// [`Self::write`] will not be dictionary encoded
150    ///
151    /// Note: [`Self::flush_data_page`] must be called first, as this will error if there
152    /// are any pending page values
153    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
154
155    /// Flush the next data page for this column chunk
156    fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
157
158    /// Flushes bloom filter if enabled and returns it, otherwise returns `None`. Subsequent writes
159    /// will *not* be tracked by the bloom filter as it is empty since. This should be called once
160    /// near the end of encoding.
161    fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
162
163    /// Computes [`GeospatialStatistics`], if any, and resets internal state such that any internal
164    /// accumulator is prepared to accumulate statistics for the next column chunk.
165    fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>>;
166}
167
168pub struct ColumnValueEncoderImpl<T: DataType> {
169    encoder: Box<dyn Encoder<T>>,
170    dict_encoder: Option<DictEncoder<T>>,
171    descr: ColumnDescPtr,
172    num_values: usize,
173    statistics_enabled: EnabledStatistics,
174    min_value: Option<T::T>,
175    max_value: Option<T::T>,
176    bloom_filter: Option<Sbbf>,
177    bloom_filter_target_fpp: f64,
178    variable_length_bytes: Option<i64>,
179    geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
180}
181
182impl<T: DataType> ColumnValueEncoderImpl<T> {
183    fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> {
184        match value_indices {
185            Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
186            None => get_min_max(&self.descr, values.iter()),
187        }
188    }
189
190    fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
191        if self.statistics_enabled != EnabledStatistics::None
192            // INTERVAL, Geometry, and Geography have undefined sort order, so don't write min/max stats for them
193            && self.descr.converted_type() != ConvertedType::INTERVAL
194        {
195            if let Some(accumulator) = self.geo_stats_accumulator.as_deref_mut() {
196                update_geo_stats_accumulator(accumulator, slice.iter());
197            } else if let Some((min, max)) = self.min_max(slice, None) {
198                update_min(&self.descr, &min, &mut self.min_value);
199                update_max(&self.descr, &max, &mut self.max_value);
200            }
201
202            if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
203                *self.variable_length_bytes.get_or_insert(0) += var_bytes;
204            }
205        }
206
207        // encode the values into bloom filter if enabled
208        if let Some(bloom_filter) = &mut self.bloom_filter {
209            for value in slice {
210                bloom_filter.insert(value);
211            }
212        }
213
214        match &mut self.dict_encoder {
215            Some(encoder) => encoder.put(slice),
216            _ => self.encoder.put(slice),
217        }
218    }
219}
220
221impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
222    type T = T::T;
223
224    type Values = [T::T];
225
226    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
227        let mut sbbf = self.bloom_filter.take()?;
228        sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
229        Some(sbbf)
230    }
231
232    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
233        let dict_supported = props.dictionary_enabled(descr.path())
234            && has_dictionary_support(T::get_physical_type(), props);
235        let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
236
237        // Set either main encoder or fallback encoder.
238        let encoder = get_encoder(
239            props
240                .encoding(descr.path())
241                .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
242            descr,
243        )?;
244
245        let statistics_enabled = props.statistics_enabled(descr.path());
246
247        let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
248
249        let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
250
251        Ok(Self {
252            encoder,
253            dict_encoder,
254            descr: descr.clone(),
255            num_values: 0,
256            statistics_enabled,
257            bloom_filter,
258            bloom_filter_target_fpp,
259            min_value: None,
260            max_value: None,
261            variable_length_bytes: None,
262            geo_stats_accumulator,
263        })
264    }
265
266    fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> Result<()> {
267        self.num_values += len;
268
269        let slice = values.get(offset..offset + len).ok_or_else(|| {
270            general_err!(
271                "Expected to write {} values, but have only {}",
272                len,
273                values.len() - offset
274            )
275        })?;
276
277        self.write_slice(slice)
278    }
279
280    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
281        self.num_values += indices.len();
282        let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
283        self.write_slice(&slice)
284    }
285
286    fn count_values_within_byte_budget(
287        values: &[T::T],
288        offset: usize,
289        len: usize,
290        byte_budget: usize,
291    ) -> Option<usize> {
292        // Clamp so that a caller-supplied `len` that overruns the input
293        // (e.g. a level/value mismatch the encoder will reject later)
294        // returns an estimate instead of panicking here.
295        let end = (offset + len).min(values.len());
296        let start = offset.min(end);
297        count_within_budget::<T>(
298            end - start,
299            byte_budget,
300            values[start..end].iter().map(Some),
301        )
302    }
303
304    fn count_values_within_byte_budget_gather(
305        values: &[T::T],
306        indices: &[usize],
307        byte_budget: usize,
308    ) -> Option<usize> {
309        // `values.get` yields `None` for an out-of-range index (defensive
310        // against a level/value mismatch the encoder rejects later); such a
311        // position is counted but contributes no bytes.
312        count_within_budget::<T>(
313            indices.len(),
314            byte_budget,
315            indices.iter().map(|&i| values.get(i)),
316        )
317    }
318
319    fn num_values(&self) -> usize {
320        self.num_values
321    }
322
323    fn has_dictionary(&self) -> bool {
324        self.dict_encoder.is_some()
325    }
326
327    fn estimated_memory_size(&self) -> usize {
328        let encoder_size = self.encoder.estimated_memory_size();
329
330        let dict_encoder_size = self
331            .dict_encoder
332            .as_ref()
333            .map(|encoder| encoder.estimated_memory_size())
334            .unwrap_or_default();
335
336        let bloom_filter_size = self
337            .bloom_filter
338            .as_ref()
339            .map(|bf| bf.estimated_memory_size())
340            .unwrap_or_default();
341
342        encoder_size + dict_encoder_size + bloom_filter_size
343    }
344
345    fn estimated_dict_page_size(&self) -> Option<usize> {
346        Some(self.dict_encoder.as_ref()?.dict_encoded_size())
347    }
348
349    fn estimated_data_page_size(&self) -> usize {
350        match &self.dict_encoder {
351            Some(encoder) => encoder.estimated_data_encoded_size(),
352            _ => self.encoder.estimated_data_encoded_size(),
353        }
354    }
355
356    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
357        match self.dict_encoder.take() {
358            Some(encoder) => {
359                if self.num_values != 0 {
360                    return Err(general_err!(
361                        "Must flush data pages before flushing dictionary"
362                    ));
363                }
364
365                let buf = encoder.write_dict()?;
366
367                Ok(Some(DictionaryPage {
368                    buf,
369                    num_values: encoder.num_entries(),
370                    is_sorted: encoder.is_sorted(),
371                }))
372            }
373            _ => Ok(None),
374        }
375    }
376
377    fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
378        let (buf, encoding) = match &mut self.dict_encoder {
379            Some(encoder) => (encoder.write_indices()?, Encoding::RLE_DICTIONARY),
380            _ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
381        };
382
383        Ok(DataPageValues {
384            buf,
385            encoding,
386            num_values: std::mem::take(&mut self.num_values),
387            min_value: self.min_value.take(),
388            max_value: self.max_value.take(),
389            variable_length_bytes: self.variable_length_bytes.take(),
390        })
391    }
392
393    fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
394        self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
395    }
396}
397
398fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
399where
400    T: ParquetValueType + 'a,
401    I: Iterator<Item = &'a T>,
402{
403    let first = loop {
404        let next = iter.next()?;
405        if !is_nan(descr, next) {
406            break next;
407        }
408    };
409
410    let mut min = first;
411    let mut max = first;
412    for val in iter {
413        if is_nan(descr, val) {
414            continue;
415        }
416        if compare_greater(descr, min, val) {
417            min = val;
418        }
419        if compare_greater(descr, val, max) {
420            max = val;
421        }
422    }
423
424    // Float/Double statistics have special case for zero.
425    //
426    // If computed min is zero, whether negative or positive,
427    // the spec states that the min should be written as -0.0
428    // (negative zero)
429    //
430    // For max, it has similar logic but will be written as 0.0
431    // (positive zero)
432    let min = replace_zero(min, descr, -0.0);
433    let max = replace_zero(max, descr, 0.0);
434
435    Some((min, max))
436}
437
438#[inline]
439fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
440    match T::PHYSICAL_TYPE {
441        Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
442            T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
443        }
444        Type::DOUBLE if f64::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
445            T::try_from_le_slice(&f64::to_le_bytes(replace as f64)).unwrap()
446        }
447        Type::FIXED_LEN_BYTE_ARRAY
448            if descr.logical_type_ref() == Some(LogicalType::Float16).as_ref()
449                && f16::from_le_bytes(val.as_bytes().try_into().unwrap()) == f16::NEG_ZERO =>
450        {
451            T::try_from_le_slice(&f16::to_le_bytes(f16::from_f32(replace))).unwrap()
452        }
453        _ => val.clone(),
454    }
455}
456
457/// Creates a bloom filter sized for the column's configured NDV, returning the filter
458/// and the target FPP for folding.
459pub(crate) fn create_bloom_filter(
460    props: &WriterProperties,
461    descr: &ColumnDescPtr,
462) -> Result<(Option<Sbbf>, f64)> {
463    match props.bloom_filter_properties(descr.path()) {
464        Some(bf_props) => Ok((
465            Some(Sbbf::new_with_ndv_fpp(bf_props.ndv(), bf_props.fpp())?),
466            bf_props.fpp(),
467        )),
468        None => Ok((None, 0.0)),
469    }
470}
471
472fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I)
473where
474    T: ParquetValueType + 'a,
475    I: Iterator<Item = &'a T>,
476{
477    if bounder.is_valid() {
478        for val in iter {
479            bounder.update_wkb(val.as_bytes());
480        }
481    }
482}
483
484/// Plain-encoded byte cost of a single value of type `T::T`.
485///
486/// Derived from [`ParquetValueType::dict_encoding_size`] (which returns
487/// `(per-value overhead, value-bytes)`) so we don't add a parallel
488/// per-value-size hook to the trait. Mirrors the dispatch in
489/// `KeyStorage::push` (`encodings/encoding/dict_encoder.rs`).
490///
491/// Placed at the end of the module deliberately. Inserting it above the
492/// `ColumnValueEncoder` trait shifts the trait and `ColumnValueEncoderImpl`
493/// within the compiled module enough to perturb downstream code placement,
494/// which measurably regresses unrelated arrow-writer string benchmarks
495/// (~5-9% on `string` / `string_and_binary_view`). Defining it last keeps
496/// the hot encoder code at the offsets it has on `main`.
497#[inline]
498fn plain_encoded_byte_size<T: DataType>(value: &T::T) -> usize {
499    let (overhead, bytes) = value.dict_encoding_size();
500    match <T::T as ParquetValueType>::PHYSICAL_TYPE {
501        // Plain BYTE_ARRAY = 4-byte length prefix + payload.
502        Type::BYTE_ARRAY => overhead + bytes,
503        // Plain FLBA = raw bytes only; `dict_encoding_size`'s length prefix
504        // is irrelevant here, so the encoder passes `type_length` directly.
505        Type::FIXED_LEN_BYTE_ARRAY => bytes,
506        // Numeric/bool are short-circuited by the caller via
507        // `mem::size_of`, so this is unreachable in practice; fall back to
508        // `overhead` defensively.
509        _ => overhead,
510    }
511}
512
513/// How many leading values fit in `byte_budget` bytes, shared by the two
514/// `ColumnValueEncoder::count_values_within_byte_budget*` methods (one walks a
515/// contiguous slice, the other gathers by index).
516///
517/// `n` is the answer when everything fits; `vals` yields each candidate value,
518/// or `None` for a position that should still be counted but contributes no
519/// bytes (an out-of-range gather index). The boundary value that crosses the
520/// budget is included in the count so the caller's page-flush check trips on
521/// this mini-batch rather than leaving a sliver for the next page; this also
522/// catches a lone outlier wherever it lands among small values.
523///
524/// Defined at the end of the module alongside `plain_encoded_byte_size` for
525/// the same reason — see that function's note on code placement and the
526/// `string` / `string_and_binary_view` benchmarks.
527#[inline]
528fn count_within_budget<'a, T: DataType>(
529    n: usize,
530    byte_budget: usize,
531    vals: impl Iterator<Item = Option<&'a T::T>>,
532) -> Option<usize>
533where
534    T::T: 'a,
535{
536    // Fixed-size physical types have a constant per-value byte cost, so the
537    // answer is one division — no walk needed.
538    let phys = <T::T as ParquetValueType>::PHYSICAL_TYPE;
539    if phys != Type::BYTE_ARRAY && phys != Type::FIXED_LEN_BYTE_ARRAY {
540        let per = std::mem::size_of::<T::T>().max(1);
541        return Some((byte_budget / per).max(1).min(n));
542    }
543    // Variable-width: accumulate, exit at the first value past the budget.
544    let mut cum: usize = 0;
545    for (i, v) in vals.enumerate() {
546        if let Some(v) = v {
547            cum = cum.saturating_add(plain_encoded_byte_size::<T>(v));
548        }
549        if cum > byte_budget {
550            return Some(i + 1);
551        }
552    }
553    Some(n)
554}