parquet/column/writer/
encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19use half::f16;
20
21use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::writer::{
24    compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
25};
26use crate::data_type::private::ParquetValueType;
27use crate::data_type::DataType;
28use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
29use crate::errors::{ParquetError, Result};
30use crate::file::properties::{EnabledStatistics, WriterProperties};
31use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
32
33/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
34pub trait ColumnValues {
35    /// The number of values in this collection
36    fn len(&self) -> usize;
37}
38
39#[cfg(feature = "arrow")]
40impl ColumnValues for dyn arrow_array::Array {
41    fn len(&self) -> usize {
42        arrow_array::Array::len(self)
43    }
44}
45
46impl<T: ParquetValueType> ColumnValues for [T] {
47    fn len(&self) -> usize {
48        self.len()
49    }
50}
51
52/// The encoded data for a dictionary page
53pub struct DictionaryPage {
54    pub buf: Bytes,
55    pub num_values: usize,
56    pub is_sorted: bool,
57}
58
59/// The encoded values for a data page, with optional statistics
60pub struct DataPageValues<T> {
61    pub buf: Bytes,
62    pub num_values: usize,
63    pub encoding: Encoding,
64    pub min_value: Option<T>,
65    pub max_value: Option<T>,
66    pub variable_length_bytes: Option<i64>,
67}
68
69/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
70/// [super::GenericColumnWriter`]
71pub trait ColumnValueEncoder {
72    /// The underlying value type of [`Self::Values`]
73    ///
74    /// Note: this avoids needing to fully qualify `<Self::Values as ColumnValues>::T`
75    type T: ParquetValueType;
76
77    /// The values encoded by this encoder
78    type Values: ColumnValues + ?Sized;
79
80    /// Create a new [`ColumnValueEncoder`]
81    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
82    where
83        Self: Sized;
84
85    /// Write the corresponding values to this [`ColumnValueEncoder`]
86    fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>;
87
88    /// Write the values at the indexes in `indices` to this [`ColumnValueEncoder`]
89    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>;
90
91    /// Returns the number of buffered values
92    fn num_values(&self) -> usize;
93
94    /// Returns true if this encoder has a dictionary page
95    fn has_dictionary(&self) -> bool;
96
97    /// Returns the estimated total memory usage of the encoder
98    ///
99    fn estimated_memory_size(&self) -> usize;
100
101    /// Returns an estimate of the encoded size of dictionary page size in bytes, or `None` if no dictionary
102    fn estimated_dict_page_size(&self) -> Option<usize>;
103
104    /// Returns an estimate of the encoded data page size in bytes
105    ///
106    /// This should include:
107    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
108    fn estimated_data_page_size(&self) -> usize;
109
110    /// Flush the dictionary page for this column chunk if any. Any subsequent calls to
111    /// [`Self::write`] will not be dictionary encoded
112    ///
113    /// Note: [`Self::flush_data_page`] must be called first, as this will error if there
114    /// are any pending page values
115    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
116
117    /// Flush the next data page for this column chunk
118    fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
119
120    /// Flushes bloom filter if enabled and returns it, otherwise returns `None`. Subsequent writes
121    /// will *not* be tracked by the bloom filter as it is empty since. This should be called once
122    /// near the end of encoding.
123    fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
124}
125
126pub struct ColumnValueEncoderImpl<T: DataType> {
127    encoder: Box<dyn Encoder<T>>,
128    dict_encoder: Option<DictEncoder<T>>,
129    descr: ColumnDescPtr,
130    num_values: usize,
131    statistics_enabled: EnabledStatistics,
132    min_value: Option<T::T>,
133    max_value: Option<T::T>,
134    bloom_filter: Option<Sbbf>,
135    variable_length_bytes: Option<i64>,
136}
137
138impl<T: DataType> ColumnValueEncoderImpl<T> {
139    fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> {
140        match value_indices {
141            Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
142            None => get_min_max(&self.descr, values.iter()),
143        }
144    }
145
146    fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
147        if self.statistics_enabled != EnabledStatistics::None
148            // INTERVAL has undefined sort order, so don't write min/max stats for it
149            && self.descr.converted_type() != ConvertedType::INTERVAL
150        {
151            if let Some((min, max)) = self.min_max(slice, None) {
152                update_min(&self.descr, &min, &mut self.min_value);
153                update_max(&self.descr, &max, &mut self.max_value);
154            }
155
156            if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
157                *self.variable_length_bytes.get_or_insert(0) += var_bytes;
158            }
159        }
160
161        // encode the values into bloom filter if enabled
162        if let Some(bloom_filter) = &mut self.bloom_filter {
163            for value in slice {
164                bloom_filter.insert(value);
165            }
166        }
167
168        match &mut self.dict_encoder {
169            Some(encoder) => encoder.put(slice),
170            _ => self.encoder.put(slice),
171        }
172    }
173}
174
175impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
176    type T = T::T;
177
178    type Values = [T::T];
179
180    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
181        self.bloom_filter.take()
182    }
183
184    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
185        let dict_supported = props.dictionary_enabled(descr.path())
186            && has_dictionary_support(T::get_physical_type(), props);
187        let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
188
189        // Set either main encoder or fallback encoder.
190        let encoder = get_encoder(
191            props
192                .encoding(descr.path())
193                .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
194            descr,
195        )?;
196
197        let statistics_enabled = props.statistics_enabled(descr.path());
198
199        let bloom_filter = props
200            .bloom_filter_properties(descr.path())
201            .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
202            .transpose()?;
203
204        Ok(Self {
205            encoder,
206            dict_encoder,
207            descr: descr.clone(),
208            num_values: 0,
209            statistics_enabled,
210            bloom_filter,
211            min_value: None,
212            max_value: None,
213            variable_length_bytes: None,
214        })
215    }
216
217    fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> Result<()> {
218        self.num_values += len;
219
220        let slice = values.get(offset..offset + len).ok_or_else(|| {
221            general_err!(
222                "Expected to write {} values, but have only {}",
223                len,
224                values.len() - offset
225            )
226        })?;
227
228        self.write_slice(slice)
229    }
230
231    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
232        self.num_values += indices.len();
233        let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
234        self.write_slice(&slice)
235    }
236
237    fn num_values(&self) -> usize {
238        self.num_values
239    }
240
241    fn has_dictionary(&self) -> bool {
242        self.dict_encoder.is_some()
243    }
244
245    fn estimated_memory_size(&self) -> usize {
246        let encoder_size = self.encoder.estimated_memory_size();
247
248        let dict_encoder_size = self
249            .dict_encoder
250            .as_ref()
251            .map(|encoder| encoder.estimated_memory_size())
252            .unwrap_or_default();
253
254        let bloom_filter_size = self
255            .bloom_filter
256            .as_ref()
257            .map(|bf| bf.estimated_memory_size())
258            .unwrap_or_default();
259
260        encoder_size + dict_encoder_size + bloom_filter_size
261    }
262
263    fn estimated_dict_page_size(&self) -> Option<usize> {
264        Some(self.dict_encoder.as_ref()?.dict_encoded_size())
265    }
266
267    fn estimated_data_page_size(&self) -> usize {
268        match &self.dict_encoder {
269            Some(encoder) => encoder.estimated_data_encoded_size(),
270            _ => self.encoder.estimated_data_encoded_size(),
271        }
272    }
273
274    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
275        match self.dict_encoder.take() {
276            Some(encoder) => {
277                if self.num_values != 0 {
278                    return Err(general_err!(
279                        "Must flush data pages before flushing dictionary"
280                    ));
281                }
282
283                let buf = encoder.write_dict()?;
284
285                Ok(Some(DictionaryPage {
286                    buf,
287                    num_values: encoder.num_entries(),
288                    is_sorted: encoder.is_sorted(),
289                }))
290            }
291            _ => Ok(None),
292        }
293    }
294
295    fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
296        let (buf, encoding) = match &mut self.dict_encoder {
297            Some(encoder) => (encoder.write_indices()?, Encoding::RLE_DICTIONARY),
298            _ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
299        };
300
301        Ok(DataPageValues {
302            buf,
303            encoding,
304            num_values: std::mem::take(&mut self.num_values),
305            min_value: self.min_value.take(),
306            max_value: self.max_value.take(),
307            variable_length_bytes: self.variable_length_bytes.take(),
308        })
309    }
310}
311
312fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
313where
314    T: ParquetValueType + 'a,
315    I: Iterator<Item = &'a T>,
316{
317    let first = loop {
318        let next = iter.next()?;
319        if !is_nan(descr, next) {
320            break next;
321        }
322    };
323
324    let mut min = first;
325    let mut max = first;
326    for val in iter {
327        if is_nan(descr, val) {
328            continue;
329        }
330        if compare_greater(descr, min, val) {
331            min = val;
332        }
333        if compare_greater(descr, val, max) {
334            max = val;
335        }
336    }
337
338    // Float/Double statistics have special case for zero.
339    //
340    // If computed min is zero, whether negative or positive,
341    // the spec states that the min should be written as -0.0
342    // (negative zero)
343    //
344    // For max, it has similar logic but will be written as 0.0
345    // (positive zero)
346    let min = replace_zero(min, descr, -0.0);
347    let max = replace_zero(max, descr, 0.0);
348
349    Some((min, max))
350}
351
352#[inline]
353fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
354    match T::PHYSICAL_TYPE {
355        Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
356            T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
357        }
358        Type::DOUBLE if f64::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
359            T::try_from_le_slice(&f64::to_le_bytes(replace as f64)).unwrap()
360        }
361        Type::FIXED_LEN_BYTE_ARRAY
362            if descr.logical_type() == Some(LogicalType::Float16)
363                && f16::from_le_bytes(val.as_bytes().try_into().unwrap()) == f16::NEG_ZERO =>
364        {
365            T::try_from_le_slice(&f16::to_le_bytes(f16::from_f32(replace))).unwrap()
366        }
367        _ => val.clone(),
368    }
369}