Skip to main content

parquet/column/writer/
encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19use half::f16;
20
21use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::writer::{
24    compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
25};
26use crate::data_type::DataType;
27use crate::data_type::private::ParquetValueType;
28use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder};
29use crate::errors::{ParquetError, Result};
30use crate::file::properties::{EnabledStatistics, WriterProperties};
31use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
32use crate::geospatial::statistics::GeospatialStatistics;
33use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
34
35/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
36pub trait ColumnValues {
37    /// The number of values in this collection
38    fn len(&self) -> usize;
39}
40
41#[cfg(feature = "arrow")]
42impl ColumnValues for dyn arrow_array::Array {
43    fn len(&self) -> usize {
44        arrow_array::Array::len(self)
45    }
46}
47
48impl<T: ParquetValueType> ColumnValues for [T] {
49    fn len(&self) -> usize {
50        self.len()
51    }
52}
53
54/// The encoded data for a dictionary page
55pub struct DictionaryPage {
56    pub buf: Bytes,
57    pub num_values: usize,
58    pub is_sorted: bool,
59}
60
61/// The encoded values for a data page, with optional statistics
62pub struct DataPageValues<T> {
63    pub buf: Bytes,
64    pub num_values: usize,
65    pub encoding: Encoding,
66    pub min_value: Option<T>,
67    pub max_value: Option<T>,
68    pub variable_length_bytes: Option<i64>,
69}
70
71/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
72/// [super::GenericColumnWriter`]
73pub trait ColumnValueEncoder {
74    /// The underlying value type of [`Self::Values`]
75    ///
76    /// Note: this avoids needing to fully qualify `<Self::Values as ColumnValues>::T`
77    type T: ParquetValueType;
78
79    /// The values encoded by this encoder
80    type Values: ColumnValues + ?Sized;
81
82    /// Create a new [`ColumnValueEncoder`]
83    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
84    where
85        Self: Sized;
86
87    /// Write the corresponding values to this [`ColumnValueEncoder`]
88    fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>;
89
90    /// Write the values at the indexes in `indices` to this [`ColumnValueEncoder`]
91    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>;
92
93    /// Returns the number of buffered values
94    fn num_values(&self) -> usize;
95
96    /// Returns true if this encoder has a dictionary page
97    fn has_dictionary(&self) -> bool;
98
99    /// Returns the estimated total memory usage of the encoder
100    ///
101    fn estimated_memory_size(&self) -> usize;
102
103    /// Returns an estimate of the encoded size of dictionary page size in bytes, or `None` if no dictionary
104    fn estimated_dict_page_size(&self) -> Option<usize>;
105
106    /// Returns an estimate of the encoded data page size in bytes
107    ///
108    /// This should include:
109    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
110    fn estimated_data_page_size(&self) -> usize;
111
112    /// Flush the dictionary page for this column chunk if any. Any subsequent calls to
113    /// [`Self::write`] will not be dictionary encoded
114    ///
115    /// Note: [`Self::flush_data_page`] must be called first, as this will error if there
116    /// are any pending page values
117    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
118
119    /// Flush the next data page for this column chunk
120    fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
121
122    /// Flushes bloom filter if enabled and returns it, otherwise returns `None`. Subsequent writes
123    /// will *not* be tracked by the bloom filter as it is empty since. This should be called once
124    /// near the end of encoding.
125    fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
126
127    /// Computes [`GeospatialStatistics`], if any, and resets internal state such that any internal
128    /// accumulator is prepared to accumulate statistics for the next column chunk.
129    fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>>;
130}
131
132pub struct ColumnValueEncoderImpl<T: DataType> {
133    encoder: Box<dyn Encoder<T>>,
134    dict_encoder: Option<DictEncoder<T>>,
135    descr: ColumnDescPtr,
136    num_values: usize,
137    statistics_enabled: EnabledStatistics,
138    min_value: Option<T::T>,
139    max_value: Option<T::T>,
140    bloom_filter: Option<Sbbf>,
141    bloom_filter_target_fpp: f64,
142    variable_length_bytes: Option<i64>,
143    geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
144}
145
146impl<T: DataType> ColumnValueEncoderImpl<T> {
147    fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> {
148        match value_indices {
149            Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
150            None => get_min_max(&self.descr, values.iter()),
151        }
152    }
153
154    fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
155        if self.statistics_enabled != EnabledStatistics::None
156            // INTERVAL, Geometry, and Geography have undefined sort order, so don't write min/max stats for them
157            && self.descr.converted_type() != ConvertedType::INTERVAL
158        {
159            if let Some(accumulator) = self.geo_stats_accumulator.as_deref_mut() {
160                update_geo_stats_accumulator(accumulator, slice.iter());
161            } else if let Some((min, max)) = self.min_max(slice, None) {
162                update_min(&self.descr, &min, &mut self.min_value);
163                update_max(&self.descr, &max, &mut self.max_value);
164            }
165
166            if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
167                *self.variable_length_bytes.get_or_insert(0) += var_bytes;
168            }
169        }
170
171        // encode the values into bloom filter if enabled
172        if let Some(bloom_filter) = &mut self.bloom_filter {
173            for value in slice {
174                bloom_filter.insert(value);
175            }
176        }
177
178        match &mut self.dict_encoder {
179            Some(encoder) => encoder.put(slice),
180            _ => self.encoder.put(slice),
181        }
182    }
183}
184
185impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
186    type T = T::T;
187
188    type Values = [T::T];
189
190    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
191        let mut sbbf = self.bloom_filter.take()?;
192        sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
193        Some(sbbf)
194    }
195
196    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
197        let dict_supported = props.dictionary_enabled(descr.path())
198            && has_dictionary_support(T::get_physical_type(), props);
199        let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
200
201        // Set either main encoder or fallback encoder.
202        let encoder = get_encoder(
203            props
204                .encoding(descr.path())
205                .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
206            descr,
207        )?;
208
209        let statistics_enabled = props.statistics_enabled(descr.path());
210
211        let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
212
213        let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
214
215        Ok(Self {
216            encoder,
217            dict_encoder,
218            descr: descr.clone(),
219            num_values: 0,
220            statistics_enabled,
221            bloom_filter,
222            bloom_filter_target_fpp,
223            min_value: None,
224            max_value: None,
225            variable_length_bytes: None,
226            geo_stats_accumulator,
227        })
228    }
229
230    fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> Result<()> {
231        self.num_values += len;
232
233        let slice = values.get(offset..offset + len).ok_or_else(|| {
234            general_err!(
235                "Expected to write {} values, but have only {}",
236                len,
237                values.len() - offset
238            )
239        })?;
240
241        self.write_slice(slice)
242    }
243
244    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
245        self.num_values += indices.len();
246        let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
247        self.write_slice(&slice)
248    }
249
250    fn num_values(&self) -> usize {
251        self.num_values
252    }
253
254    fn has_dictionary(&self) -> bool {
255        self.dict_encoder.is_some()
256    }
257
258    fn estimated_memory_size(&self) -> usize {
259        let encoder_size = self.encoder.estimated_memory_size();
260
261        let dict_encoder_size = self
262            .dict_encoder
263            .as_ref()
264            .map(|encoder| encoder.estimated_memory_size())
265            .unwrap_or_default();
266
267        let bloom_filter_size = self
268            .bloom_filter
269            .as_ref()
270            .map(|bf| bf.estimated_memory_size())
271            .unwrap_or_default();
272
273        encoder_size + dict_encoder_size + bloom_filter_size
274    }
275
276    fn estimated_dict_page_size(&self) -> Option<usize> {
277        Some(self.dict_encoder.as_ref()?.dict_encoded_size())
278    }
279
280    fn estimated_data_page_size(&self) -> usize {
281        match &self.dict_encoder {
282            Some(encoder) => encoder.estimated_data_encoded_size(),
283            _ => self.encoder.estimated_data_encoded_size(),
284        }
285    }
286
287    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
288        match self.dict_encoder.take() {
289            Some(encoder) => {
290                if self.num_values != 0 {
291                    return Err(general_err!(
292                        "Must flush data pages before flushing dictionary"
293                    ));
294                }
295
296                let buf = encoder.write_dict()?;
297
298                Ok(Some(DictionaryPage {
299                    buf,
300                    num_values: encoder.num_entries(),
301                    is_sorted: encoder.is_sorted(),
302                }))
303            }
304            _ => Ok(None),
305        }
306    }
307
308    fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
309        let (buf, encoding) = match &mut self.dict_encoder {
310            Some(encoder) => (encoder.write_indices()?, Encoding::RLE_DICTIONARY),
311            _ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
312        };
313
314        Ok(DataPageValues {
315            buf,
316            encoding,
317            num_values: std::mem::take(&mut self.num_values),
318            min_value: self.min_value.take(),
319            max_value: self.max_value.take(),
320            variable_length_bytes: self.variable_length_bytes.take(),
321        })
322    }
323
324    fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
325        self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
326    }
327}
328
329fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
330where
331    T: ParquetValueType + 'a,
332    I: Iterator<Item = &'a T>,
333{
334    let first = loop {
335        let next = iter.next()?;
336        if !is_nan(descr, next) {
337            break next;
338        }
339    };
340
341    let mut min = first;
342    let mut max = first;
343    for val in iter {
344        if is_nan(descr, val) {
345            continue;
346        }
347        if compare_greater(descr, min, val) {
348            min = val;
349        }
350        if compare_greater(descr, val, max) {
351            max = val;
352        }
353    }
354
355    // Float/Double statistics have special case for zero.
356    //
357    // If computed min is zero, whether negative or positive,
358    // the spec states that the min should be written as -0.0
359    // (negative zero)
360    //
361    // For max, it has similar logic but will be written as 0.0
362    // (positive zero)
363    let min = replace_zero(min, descr, -0.0);
364    let max = replace_zero(max, descr, 0.0);
365
366    Some((min, max))
367}
368
369#[inline]
370fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
371    match T::PHYSICAL_TYPE {
372        Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
373            T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
374        }
375        Type::DOUBLE if f64::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
376            T::try_from_le_slice(&f64::to_le_bytes(replace as f64)).unwrap()
377        }
378        Type::FIXED_LEN_BYTE_ARRAY
379            if descr.logical_type_ref() == Some(LogicalType::Float16).as_ref()
380                && f16::from_le_bytes(val.as_bytes().try_into().unwrap()) == f16::NEG_ZERO =>
381        {
382            T::try_from_le_slice(&f16::to_le_bytes(f16::from_f32(replace))).unwrap()
383        }
384        _ => val.clone(),
385    }
386}
387
388/// Creates a bloom filter sized for the column's configured NDV, returning the filter
389/// and the target FPP for folding.
390pub(crate) fn create_bloom_filter(
391    props: &WriterProperties,
392    descr: &ColumnDescPtr,
393) -> Result<(Option<Sbbf>, f64)> {
394    match props.bloom_filter_properties(descr.path()) {
395        Some(bf_props) => Ok((
396            Some(Sbbf::new_with_ndv_fpp(bf_props.ndv, bf_props.fpp)?),
397            bf_props.fpp,
398        )),
399        None => Ok((None, 0.0)),
400    }
401}
402
403fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I)
404where
405    T: ParquetValueType + 'a,
406    I: Iterator<Item = &'a T>,
407{
408    if bounder.is_valid() {
409        for val in iter {
410            bounder.update_wkb(val.as_bytes());
411        }
412    }
413}