1use bytes::Bytes;
19use half::f16;
20
21use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::writer::{
24 compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
25};
26use crate::data_type::private::ParquetValueType;
27use crate::data_type::DataType;
28use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
29use crate::errors::{ParquetError, Result};
30use crate::file::properties::{EnabledStatistics, WriterProperties};
31use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
32
33pub trait ColumnValues {
35 fn len(&self) -> usize;
37}
38
39#[cfg(feature = "arrow")]
40impl ColumnValues for dyn arrow_array::Array {
41 fn len(&self) -> usize {
42 arrow_array::Array::len(self)
43 }
44}
45
46impl<T: ParquetValueType> ColumnValues for [T] {
47 fn len(&self) -> usize {
48 self.len()
49 }
50}
51
52pub struct DictionaryPage {
54 pub buf: Bytes,
55 pub num_values: usize,
56 pub is_sorted: bool,
57}
58
59pub struct DataPageValues<T> {
61 pub buf: Bytes,
62 pub num_values: usize,
63 pub encoding: Encoding,
64 pub min_value: Option<T>,
65 pub max_value: Option<T>,
66 pub variable_length_bytes: Option<i64>,
67}
68
69pub trait ColumnValueEncoder {
72 type T: ParquetValueType;
76
77 type Values: ColumnValues + ?Sized;
79
80 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
82 where
83 Self: Sized;
84
85 fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>;
87
88 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>;
90
91 fn num_values(&self) -> usize;
93
94 fn has_dictionary(&self) -> bool;
96
97 fn estimated_memory_size(&self) -> usize;
100
101 fn estimated_dict_page_size(&self) -> Option<usize>;
103
104 fn estimated_data_page_size(&self) -> usize;
109
110 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
116
117 fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
119
120 fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
124}
125
126pub struct ColumnValueEncoderImpl<T: DataType> {
127 encoder: Box<dyn Encoder<T>>,
128 dict_encoder: Option<DictEncoder<T>>,
129 descr: ColumnDescPtr,
130 num_values: usize,
131 statistics_enabled: EnabledStatistics,
132 min_value: Option<T::T>,
133 max_value: Option<T::T>,
134 bloom_filter: Option<Sbbf>,
135 variable_length_bytes: Option<i64>,
136}
137
138impl<T: DataType> ColumnValueEncoderImpl<T> {
139 fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> {
140 match value_indices {
141 Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
142 None => get_min_max(&self.descr, values.iter()),
143 }
144 }
145
146 fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
147 if self.statistics_enabled != EnabledStatistics::None
148 && self.descr.converted_type() != ConvertedType::INTERVAL
150 {
151 if let Some((min, max)) = self.min_max(slice, None) {
152 update_min(&self.descr, &min, &mut self.min_value);
153 update_max(&self.descr, &max, &mut self.max_value);
154 }
155
156 if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
157 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
158 }
159 }
160
161 if let Some(bloom_filter) = &mut self.bloom_filter {
163 for value in slice {
164 bloom_filter.insert(value);
165 }
166 }
167
168 match &mut self.dict_encoder {
169 Some(encoder) => encoder.put(slice),
170 _ => self.encoder.put(slice),
171 }
172 }
173}
174
175impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
176 type T = T::T;
177
178 type Values = [T::T];
179
180 fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
181 self.bloom_filter.take()
182 }
183
184 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
185 let dict_supported = props.dictionary_enabled(descr.path())
186 && has_dictionary_support(T::get_physical_type(), props);
187 let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
188
189 let encoder = get_encoder(
191 props
192 .encoding(descr.path())
193 .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
194 descr,
195 )?;
196
197 let statistics_enabled = props.statistics_enabled(descr.path());
198
199 let bloom_filter = props
200 .bloom_filter_properties(descr.path())
201 .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
202 .transpose()?;
203
204 Ok(Self {
205 encoder,
206 dict_encoder,
207 descr: descr.clone(),
208 num_values: 0,
209 statistics_enabled,
210 bloom_filter,
211 min_value: None,
212 max_value: None,
213 variable_length_bytes: None,
214 })
215 }
216
217 fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> Result<()> {
218 self.num_values += len;
219
220 let slice = values.get(offset..offset + len).ok_or_else(|| {
221 general_err!(
222 "Expected to write {} values, but have only {}",
223 len,
224 values.len() - offset
225 )
226 })?;
227
228 self.write_slice(slice)
229 }
230
231 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
232 self.num_values += indices.len();
233 let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
234 self.write_slice(&slice)
235 }
236
237 fn num_values(&self) -> usize {
238 self.num_values
239 }
240
241 fn has_dictionary(&self) -> bool {
242 self.dict_encoder.is_some()
243 }
244
245 fn estimated_memory_size(&self) -> usize {
246 let encoder_size = self.encoder.estimated_memory_size();
247
248 let dict_encoder_size = self
249 .dict_encoder
250 .as_ref()
251 .map(|encoder| encoder.estimated_memory_size())
252 .unwrap_or_default();
253
254 let bloom_filter_size = self
255 .bloom_filter
256 .as_ref()
257 .map(|bf| bf.estimated_memory_size())
258 .unwrap_or_default();
259
260 encoder_size + dict_encoder_size + bloom_filter_size
261 }
262
263 fn estimated_dict_page_size(&self) -> Option<usize> {
264 Some(self.dict_encoder.as_ref()?.dict_encoded_size())
265 }
266
267 fn estimated_data_page_size(&self) -> usize {
268 match &self.dict_encoder {
269 Some(encoder) => encoder.estimated_data_encoded_size(),
270 _ => self.encoder.estimated_data_encoded_size(),
271 }
272 }
273
274 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
275 match self.dict_encoder.take() {
276 Some(encoder) => {
277 if self.num_values != 0 {
278 return Err(general_err!(
279 "Must flush data pages before flushing dictionary"
280 ));
281 }
282
283 let buf = encoder.write_dict()?;
284
285 Ok(Some(DictionaryPage {
286 buf,
287 num_values: encoder.num_entries(),
288 is_sorted: encoder.is_sorted(),
289 }))
290 }
291 _ => Ok(None),
292 }
293 }
294
295 fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
296 let (buf, encoding) = match &mut self.dict_encoder {
297 Some(encoder) => (encoder.write_indices()?, Encoding::RLE_DICTIONARY),
298 _ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
299 };
300
301 Ok(DataPageValues {
302 buf,
303 encoding,
304 num_values: std::mem::take(&mut self.num_values),
305 min_value: self.min_value.take(),
306 max_value: self.max_value.take(),
307 variable_length_bytes: self.variable_length_bytes.take(),
308 })
309 }
310}
311
312fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
313where
314 T: ParquetValueType + 'a,
315 I: Iterator<Item = &'a T>,
316{
317 let first = loop {
318 let next = iter.next()?;
319 if !is_nan(descr, next) {
320 break next;
321 }
322 };
323
324 let mut min = first;
325 let mut max = first;
326 for val in iter {
327 if is_nan(descr, val) {
328 continue;
329 }
330 if compare_greater(descr, min, val) {
331 min = val;
332 }
333 if compare_greater(descr, val, max) {
334 max = val;
335 }
336 }
337
338 let min = replace_zero(min, descr, -0.0);
347 let max = replace_zero(max, descr, 0.0);
348
349 Some((min, max))
350}
351
352#[inline]
353fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
354 match T::PHYSICAL_TYPE {
355 Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
356 T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
357 }
358 Type::DOUBLE if f64::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
359 T::try_from_le_slice(&f64::to_le_bytes(replace as f64)).unwrap()
360 }
361 Type::FIXED_LEN_BYTE_ARRAY
362 if descr.logical_type() == Some(LogicalType::Float16)
363 && f16::from_le_bytes(val.as_bytes().try_into().unwrap()) == f16::NEG_ZERO =>
364 {
365 T::try_from_le_slice(&f16::to_le_bytes(f16::from_f32(replace))).unwrap()
366 }
367 _ => val.clone(),
368 }
369}