1use bytes::Bytes;
19use half::f16;
20
21use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::writer::{
24 compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
25};
26use crate::data_type::DataType;
27use crate::data_type::private::ParquetValueType;
28use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder};
29use crate::errors::{ParquetError, Result};
30use crate::file::properties::{EnabledStatistics, WriterProperties};
31use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
32use crate::geospatial::statistics::GeospatialStatistics;
33use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
34
35pub trait ColumnValues {
37 fn len(&self) -> usize;
39}
40
41#[cfg(feature = "arrow")]
42impl ColumnValues for dyn arrow_array::Array {
43 fn len(&self) -> usize {
44 arrow_array::Array::len(self)
45 }
46}
47
48impl<T: ParquetValueType> ColumnValues for [T] {
49 fn len(&self) -> usize {
50 self.len()
51 }
52}
53
54pub struct DictionaryPage {
56 pub buf: Bytes,
57 pub num_values: usize,
58 pub is_sorted: bool,
59}
60
61pub struct DataPageValues<T> {
63 pub buf: Bytes,
64 pub num_values: usize,
65 pub encoding: Encoding,
66 pub min_value: Option<T>,
67 pub max_value: Option<T>,
68 pub variable_length_bytes: Option<i64>,
69}
70
71pub trait ColumnValueEncoder {
74 type T: ParquetValueType;
78
79 type Values: ColumnValues + ?Sized;
81
82 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
84 where
85 Self: Sized;
86
87 fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>;
89
90 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>;
92
93 fn num_values(&self) -> usize;
95
96 fn has_dictionary(&self) -> bool;
98
99 fn estimated_memory_size(&self) -> usize;
102
103 fn estimated_dict_page_size(&self) -> Option<usize>;
105
106 fn estimated_data_page_size(&self) -> usize;
111
112 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
118
119 fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
121
122 fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
126
127 fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>>;
130}
131
132pub struct ColumnValueEncoderImpl<T: DataType> {
133 encoder: Box<dyn Encoder<T>>,
134 dict_encoder: Option<DictEncoder<T>>,
135 descr: ColumnDescPtr,
136 num_values: usize,
137 statistics_enabled: EnabledStatistics,
138 min_value: Option<T::T>,
139 max_value: Option<T::T>,
140 bloom_filter: Option<Sbbf>,
141 bloom_filter_target_fpp: f64,
142 variable_length_bytes: Option<i64>,
143 geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
144}
145
146impl<T: DataType> ColumnValueEncoderImpl<T> {
147 fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> {
148 match value_indices {
149 Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
150 None => get_min_max(&self.descr, values.iter()),
151 }
152 }
153
154 fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
155 if self.statistics_enabled != EnabledStatistics::None
156 && self.descr.converted_type() != ConvertedType::INTERVAL
158 {
159 if let Some(accumulator) = self.geo_stats_accumulator.as_deref_mut() {
160 update_geo_stats_accumulator(accumulator, slice.iter());
161 } else if let Some((min, max)) = self.min_max(slice, None) {
162 update_min(&self.descr, &min, &mut self.min_value);
163 update_max(&self.descr, &max, &mut self.max_value);
164 }
165
166 if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
167 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
168 }
169 }
170
171 if let Some(bloom_filter) = &mut self.bloom_filter {
173 for value in slice {
174 bloom_filter.insert(value);
175 }
176 }
177
178 match &mut self.dict_encoder {
179 Some(encoder) => encoder.put(slice),
180 _ => self.encoder.put(slice),
181 }
182 }
183}
184
185impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
186 type T = T::T;
187
188 type Values = [T::T];
189
190 fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
191 let mut sbbf = self.bloom_filter.take()?;
192 sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
193 Some(sbbf)
194 }
195
196 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
197 let dict_supported = props.dictionary_enabled(descr.path())
198 && has_dictionary_support(T::get_physical_type(), props);
199 let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
200
201 let encoder = get_encoder(
203 props
204 .encoding(descr.path())
205 .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
206 descr,
207 )?;
208
209 let statistics_enabled = props.statistics_enabled(descr.path());
210
211 let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
212
213 let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
214
215 Ok(Self {
216 encoder,
217 dict_encoder,
218 descr: descr.clone(),
219 num_values: 0,
220 statistics_enabled,
221 bloom_filter,
222 bloom_filter_target_fpp,
223 min_value: None,
224 max_value: None,
225 variable_length_bytes: None,
226 geo_stats_accumulator,
227 })
228 }
229
230 fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> Result<()> {
231 self.num_values += len;
232
233 let slice = values.get(offset..offset + len).ok_or_else(|| {
234 general_err!(
235 "Expected to write {} values, but have only {}",
236 len,
237 values.len() - offset
238 )
239 })?;
240
241 self.write_slice(slice)
242 }
243
244 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
245 self.num_values += indices.len();
246 let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
247 self.write_slice(&slice)
248 }
249
250 fn num_values(&self) -> usize {
251 self.num_values
252 }
253
254 fn has_dictionary(&self) -> bool {
255 self.dict_encoder.is_some()
256 }
257
258 fn estimated_memory_size(&self) -> usize {
259 let encoder_size = self.encoder.estimated_memory_size();
260
261 let dict_encoder_size = self
262 .dict_encoder
263 .as_ref()
264 .map(|encoder| encoder.estimated_memory_size())
265 .unwrap_or_default();
266
267 let bloom_filter_size = self
268 .bloom_filter
269 .as_ref()
270 .map(|bf| bf.estimated_memory_size())
271 .unwrap_or_default();
272
273 encoder_size + dict_encoder_size + bloom_filter_size
274 }
275
276 fn estimated_dict_page_size(&self) -> Option<usize> {
277 Some(self.dict_encoder.as_ref()?.dict_encoded_size())
278 }
279
280 fn estimated_data_page_size(&self) -> usize {
281 match &self.dict_encoder {
282 Some(encoder) => encoder.estimated_data_encoded_size(),
283 _ => self.encoder.estimated_data_encoded_size(),
284 }
285 }
286
287 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
288 match self.dict_encoder.take() {
289 Some(encoder) => {
290 if self.num_values != 0 {
291 return Err(general_err!(
292 "Must flush data pages before flushing dictionary"
293 ));
294 }
295
296 let buf = encoder.write_dict()?;
297
298 Ok(Some(DictionaryPage {
299 buf,
300 num_values: encoder.num_entries(),
301 is_sorted: encoder.is_sorted(),
302 }))
303 }
304 _ => Ok(None),
305 }
306 }
307
308 fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
309 let (buf, encoding) = match &mut self.dict_encoder {
310 Some(encoder) => (encoder.write_indices()?, Encoding::RLE_DICTIONARY),
311 _ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
312 };
313
314 Ok(DataPageValues {
315 buf,
316 encoding,
317 num_values: std::mem::take(&mut self.num_values),
318 min_value: self.min_value.take(),
319 max_value: self.max_value.take(),
320 variable_length_bytes: self.variable_length_bytes.take(),
321 })
322 }
323
324 fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
325 self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
326 }
327}
328
329fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
330where
331 T: ParquetValueType + 'a,
332 I: Iterator<Item = &'a T>,
333{
334 let first = loop {
335 let next = iter.next()?;
336 if !is_nan(descr, next) {
337 break next;
338 }
339 };
340
341 let mut min = first;
342 let mut max = first;
343 for val in iter {
344 if is_nan(descr, val) {
345 continue;
346 }
347 if compare_greater(descr, min, val) {
348 min = val;
349 }
350 if compare_greater(descr, val, max) {
351 max = val;
352 }
353 }
354
355 let min = replace_zero(min, descr, -0.0);
364 let max = replace_zero(max, descr, 0.0);
365
366 Some((min, max))
367}
368
369#[inline]
370fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
371 match T::PHYSICAL_TYPE {
372 Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
373 T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
374 }
375 Type::DOUBLE if f64::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
376 T::try_from_le_slice(&f64::to_le_bytes(replace as f64)).unwrap()
377 }
378 Type::FIXED_LEN_BYTE_ARRAY
379 if descr.logical_type_ref() == Some(LogicalType::Float16).as_ref()
380 && f16::from_le_bytes(val.as_bytes().try_into().unwrap()) == f16::NEG_ZERO =>
381 {
382 T::try_from_le_slice(&f16::to_le_bytes(f16::from_f32(replace))).unwrap()
383 }
384 _ => val.clone(),
385 }
386}
387
388pub(crate) fn create_bloom_filter(
391 props: &WriterProperties,
392 descr: &ColumnDescPtr,
393) -> Result<(Option<Sbbf>, f64)> {
394 match props.bloom_filter_properties(descr.path()) {
395 Some(bf_props) => Ok((
396 Some(Sbbf::new_with_ndv_fpp(bf_props.ndv, bf_props.fpp)?),
397 bf_props.fpp,
398 )),
399 None => Ok((None, 0.0)),
400 }
401}
402
403fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I)
404where
405 T: ParquetValueType + 'a,
406 I: Iterator<Item = &'a T>,
407{
408 if bounder.is_valid() {
409 for val in iter {
410 bounder.update_wkb(val.as_bytes());
411 }
412 }
413}