1use bytes::Bytes;
19use half::f16;
20
21use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::writer::{
24 compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
25};
26use crate::data_type::DataType;
27use crate::data_type::private::ParquetValueType;
28use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder};
29use crate::errors::{ParquetError, Result};
30use crate::file::properties::{EnabledStatistics, WriterProperties};
31use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
32use crate::geospatial::statistics::GeospatialStatistics;
33use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
34
35pub trait ColumnValues {
37 fn len(&self) -> usize;
39}
40
41#[cfg(feature = "arrow")]
42impl ColumnValues for dyn arrow_array::Array {
43 fn len(&self) -> usize {
44 arrow_array::Array::len(self)
45 }
46}
47
48impl<T: ParquetValueType> ColumnValues for [T] {
49 fn len(&self) -> usize {
50 self.len()
51 }
52}
53
54pub struct DictionaryPage {
56 pub buf: Bytes,
57 pub num_values: usize,
58 pub is_sorted: bool,
59}
60
61pub struct DataPageValues<T> {
63 pub buf: Bytes,
64 pub num_values: usize,
65 pub encoding: Encoding,
66 pub min_value: Option<T>,
67 pub max_value: Option<T>,
68 pub variable_length_bytes: Option<i64>,
69}
70
71pub trait ColumnValueEncoder {
74 type T: ParquetValueType;
78
79 type Values: ColumnValues + ?Sized;
81
82 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
84 where
85 Self: Sized;
86
87 fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>;
89
90 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>;
92
93 fn count_values_within_byte_budget(
110 _values: &Self::Values,
111 _offset: usize,
112 _len: usize,
113 _byte_budget: usize,
114 ) -> Option<usize> {
115 None
116 }
117
118 fn count_values_within_byte_budget_gather(
122 _values: &Self::Values,
123 _indices: &[usize],
124 _byte_budget: usize,
125 ) -> Option<usize> {
126 None
127 }
128
129 fn num_values(&self) -> usize;
131
132 fn has_dictionary(&self) -> bool;
134
135 fn estimated_memory_size(&self) -> usize;
138
139 fn estimated_dict_page_size(&self) -> Option<usize>;
141
142 fn estimated_data_page_size(&self) -> usize;
147
148 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
154
155 fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
157
158 fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
162
163 fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>>;
166}
167
168pub struct ColumnValueEncoderImpl<T: DataType> {
169 encoder: Box<dyn Encoder<T>>,
170 dict_encoder: Option<DictEncoder<T>>,
171 descr: ColumnDescPtr,
172 num_values: usize,
173 statistics_enabled: EnabledStatistics,
174 min_value: Option<T::T>,
175 max_value: Option<T::T>,
176 bloom_filter: Option<Sbbf>,
177 bloom_filter_target_fpp: f64,
178 variable_length_bytes: Option<i64>,
179 geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
180}
181
182impl<T: DataType> ColumnValueEncoderImpl<T> {
183 fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> {
184 match value_indices {
185 Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
186 None => get_min_max(&self.descr, values.iter()),
187 }
188 }
189
190 fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
191 if self.statistics_enabled != EnabledStatistics::None
192 && self.descr.converted_type() != ConvertedType::INTERVAL
194 {
195 if let Some(accumulator) = self.geo_stats_accumulator.as_deref_mut() {
196 update_geo_stats_accumulator(accumulator, slice.iter());
197 } else if let Some((min, max)) = self.min_max(slice, None) {
198 update_min(&self.descr, &min, &mut self.min_value);
199 update_max(&self.descr, &max, &mut self.max_value);
200 }
201
202 if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
203 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
204 }
205 }
206
207 if let Some(bloom_filter) = &mut self.bloom_filter {
209 for value in slice {
210 bloom_filter.insert(value);
211 }
212 }
213
214 match &mut self.dict_encoder {
215 Some(encoder) => encoder.put(slice),
216 _ => self.encoder.put(slice),
217 }
218 }
219}
220
221impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
222 type T = T::T;
223
224 type Values = [T::T];
225
226 fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
227 let mut sbbf = self.bloom_filter.take()?;
228 sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
229 Some(sbbf)
230 }
231
232 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
233 let dict_supported = props.dictionary_enabled(descr.path())
234 && has_dictionary_support(T::get_physical_type(), props);
235 let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
236
237 let encoder = get_encoder(
239 props
240 .encoding(descr.path())
241 .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
242 descr,
243 )?;
244
245 let statistics_enabled = props.statistics_enabled(descr.path());
246
247 let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
248
249 let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
250
251 Ok(Self {
252 encoder,
253 dict_encoder,
254 descr: descr.clone(),
255 num_values: 0,
256 statistics_enabled,
257 bloom_filter,
258 bloom_filter_target_fpp,
259 min_value: None,
260 max_value: None,
261 variable_length_bytes: None,
262 geo_stats_accumulator,
263 })
264 }
265
266 fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> Result<()> {
267 self.num_values += len;
268
269 let slice = values.get(offset..offset + len).ok_or_else(|| {
270 general_err!(
271 "Expected to write {} values, but have only {}",
272 len,
273 values.len() - offset
274 )
275 })?;
276
277 self.write_slice(slice)
278 }
279
280 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
281 self.num_values += indices.len();
282 let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
283 self.write_slice(&slice)
284 }
285
286 fn count_values_within_byte_budget(
287 values: &[T::T],
288 offset: usize,
289 len: usize,
290 byte_budget: usize,
291 ) -> Option<usize> {
292 let end = (offset + len).min(values.len());
296 let start = offset.min(end);
297 count_within_budget::<T>(
298 end - start,
299 byte_budget,
300 values[start..end].iter().map(Some),
301 )
302 }
303
304 fn count_values_within_byte_budget_gather(
305 values: &[T::T],
306 indices: &[usize],
307 byte_budget: usize,
308 ) -> Option<usize> {
309 count_within_budget::<T>(
313 indices.len(),
314 byte_budget,
315 indices.iter().map(|&i| values.get(i)),
316 )
317 }
318
319 fn num_values(&self) -> usize {
320 self.num_values
321 }
322
323 fn has_dictionary(&self) -> bool {
324 self.dict_encoder.is_some()
325 }
326
327 fn estimated_memory_size(&self) -> usize {
328 let encoder_size = self.encoder.estimated_memory_size();
329
330 let dict_encoder_size = self
331 .dict_encoder
332 .as_ref()
333 .map(|encoder| encoder.estimated_memory_size())
334 .unwrap_or_default();
335
336 let bloom_filter_size = self
337 .bloom_filter
338 .as_ref()
339 .map(|bf| bf.estimated_memory_size())
340 .unwrap_or_default();
341
342 encoder_size + dict_encoder_size + bloom_filter_size
343 }
344
345 fn estimated_dict_page_size(&self) -> Option<usize> {
346 Some(self.dict_encoder.as_ref()?.dict_encoded_size())
347 }
348
349 fn estimated_data_page_size(&self) -> usize {
350 match &self.dict_encoder {
351 Some(encoder) => encoder.estimated_data_encoded_size(),
352 _ => self.encoder.estimated_data_encoded_size(),
353 }
354 }
355
356 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
357 match self.dict_encoder.take() {
358 Some(encoder) => {
359 if self.num_values != 0 {
360 return Err(general_err!(
361 "Must flush data pages before flushing dictionary"
362 ));
363 }
364
365 let buf = encoder.write_dict()?;
366
367 Ok(Some(DictionaryPage {
368 buf,
369 num_values: encoder.num_entries(),
370 is_sorted: encoder.is_sorted(),
371 }))
372 }
373 _ => Ok(None),
374 }
375 }
376
377 fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
378 let (buf, encoding) = match &mut self.dict_encoder {
379 Some(encoder) => (encoder.write_indices()?, Encoding::RLE_DICTIONARY),
380 _ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
381 };
382
383 Ok(DataPageValues {
384 buf,
385 encoding,
386 num_values: std::mem::take(&mut self.num_values),
387 min_value: self.min_value.take(),
388 max_value: self.max_value.take(),
389 variable_length_bytes: self.variable_length_bytes.take(),
390 })
391 }
392
393 fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
394 self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
395 }
396}
397
398fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
399where
400 T: ParquetValueType + 'a,
401 I: Iterator<Item = &'a T>,
402{
403 let first = loop {
404 let next = iter.next()?;
405 if !is_nan(descr, next) {
406 break next;
407 }
408 };
409
410 let mut min = first;
411 let mut max = first;
412 for val in iter {
413 if is_nan(descr, val) {
414 continue;
415 }
416 if compare_greater(descr, min, val) {
417 min = val;
418 }
419 if compare_greater(descr, val, max) {
420 max = val;
421 }
422 }
423
424 let min = replace_zero(min, descr, -0.0);
433 let max = replace_zero(max, descr, 0.0);
434
435 Some((min, max))
436}
437
438#[inline]
439fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
440 match T::PHYSICAL_TYPE {
441 Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
442 T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
443 }
444 Type::DOUBLE if f64::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
445 T::try_from_le_slice(&f64::to_le_bytes(replace as f64)).unwrap()
446 }
447 Type::FIXED_LEN_BYTE_ARRAY
448 if descr.logical_type_ref() == Some(LogicalType::Float16).as_ref()
449 && f16::from_le_bytes(val.as_bytes().try_into().unwrap()) == f16::NEG_ZERO =>
450 {
451 T::try_from_le_slice(&f16::to_le_bytes(f16::from_f32(replace))).unwrap()
452 }
453 _ => val.clone(),
454 }
455}
456
457pub(crate) fn create_bloom_filter(
460 props: &WriterProperties,
461 descr: &ColumnDescPtr,
462) -> Result<(Option<Sbbf>, f64)> {
463 match props.bloom_filter_properties(descr.path()) {
464 Some(bf_props) => Ok((
465 Some(Sbbf::new_with_ndv_fpp(bf_props.ndv(), bf_props.fpp())?),
466 bf_props.fpp(),
467 )),
468 None => Ok((None, 0.0)),
469 }
470}
471
472fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I)
473where
474 T: ParquetValueType + 'a,
475 I: Iterator<Item = &'a T>,
476{
477 if bounder.is_valid() {
478 for val in iter {
479 bounder.update_wkb(val.as_bytes());
480 }
481 }
482}
483
484#[inline]
498fn plain_encoded_byte_size<T: DataType>(value: &T::T) -> usize {
499 let (overhead, bytes) = value.dict_encoding_size();
500 match <T::T as ParquetValueType>::PHYSICAL_TYPE {
501 Type::BYTE_ARRAY => overhead + bytes,
503 Type::FIXED_LEN_BYTE_ARRAY => bytes,
506 _ => overhead,
510 }
511}
512
513#[inline]
528fn count_within_budget<'a, T: DataType>(
529 n: usize,
530 byte_budget: usize,
531 vals: impl Iterator<Item = Option<&'a T::T>>,
532) -> Option<usize>
533where
534 T::T: 'a,
535{
536 let phys = <T::T as ParquetValueType>::PHYSICAL_TYPE;
539 if phys != Type::BYTE_ARRAY && phys != Type::FIXED_LEN_BYTE_ARRAY {
540 let per = std::mem::size_of::<T::T>().max(1);
541 return Some((byte_budget / per).max(1).min(n));
542 }
543 let mut cum: usize = 0;
545 for (i, v) in vals.enumerate() {
546 if let Some(v) = v {
547 cum = cum.saturating_add(plain_encoded_byte_size::<T>(v));
548 }
549 if cum > byte_budget {
550 return Some(i + 1);
551 }
552 }
553 Some(n)
554}