1use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{
21 ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
22};
23use crate::data_type::{AsBytes, ByteArray, Int32Type};
24use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
25use crate::encodings::rle::RleEncoder;
26use crate::errors::{ParquetError, Result};
27use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
28use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
29use crate::geospatial::statistics::GeospatialStatistics;
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::num_required_bits;
32use crate::util::interner::{Interner, Storage};
33use arrow_array::{
34 Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
35 LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
36};
37use arrow_schema::DataType;
38
39macro_rules! downcast_dict_impl {
40 ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
41 $op($array
42 .as_any()
43 .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
44 .unwrap()
45 .downcast_dict::<$val>()
46 .unwrap()$(, $arg)*)
47 }};
48}
49
50macro_rules! downcast_dict_op {
51 ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
52 match $key_type.as_ref() {
53 DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
54 DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
55 DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
56 DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
57 DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
58 DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
59 DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
60 DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
61 _ => unreachable!(),
62 }
63 };
64}
65
66macro_rules! downcast_op {
67 ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
68 match $data_type {
69 DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
70 DataType::LargeUtf8 => {
71 $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
72 }
73 DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
74 DataType::Binary => {
75 $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
76 }
77 DataType::LargeBinary => {
78 $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
79 }
80 DataType::BinaryView => {
81 $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
82 }
83 DataType::Dictionary(key, value) => match value.as_ref() {
84 DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
85 DataType::LargeUtf8 => {
86 downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
87 }
88 DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
89 DataType::LargeBinary => {
90 downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
91 }
92 DataType::FixedSizeBinary(_) => {
93 downcast_dict_op!(key, FixedSizeBinaryArray, $array, $op$(, $arg)*)
94 }
95 d => unreachable!("cannot downcast {} dictionary value to byte array", d),
96 },
97 d => unreachable!("cannot downcast {} to byte array", d),
98 }
99 };
100}
101
102struct FallbackEncoder {
104 encoder: FallbackEncoderImpl,
105 num_values: usize,
106 variable_length_bytes: i64,
107}
108
109enum FallbackEncoderImpl {
113 Plain {
114 buffer: Vec<u8>,
115 },
116 DeltaLength {
117 buffer: Vec<u8>,
118 lengths: Box<DeltaBitPackEncoder<Int32Type>>,
119 },
120 Delta {
121 buffer: Vec<u8>,
122 last_value: Vec<u8>,
123 prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
124 suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
125 },
126}
127
128impl FallbackEncoder {
129 fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
131 let encoding =
133 props
134 .encoding(descr.path())
135 .unwrap_or_else(|| match props.writer_version() {
136 WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
137 WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
138 });
139
140 let encoder = match encoding {
141 Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
142 Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
143 buffer: vec![],
144 lengths: Box::new(DeltaBitPackEncoder::new()),
145 },
146 Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
147 buffer: vec![],
148 last_value: vec![],
149 prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
150 suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
151 },
152 _ => {
153 return Err(general_err!(
154 "unsupported encoding {} for byte array",
155 encoding
156 ));
157 }
158 };
159
160 Ok(Self {
161 encoder,
162 num_values: 0,
163 variable_length_bytes: 0,
164 })
165 }
166
167 fn encode<T>(&mut self, values: T, indices: &[usize])
169 where
170 T: ArrayAccessor + Copy,
171 T::Item: AsRef<[u8]>,
172 {
173 self.num_values += indices.len();
174 match &mut self.encoder {
175 FallbackEncoderImpl::Plain { buffer } => {
176 for idx in indices {
177 let value = values.value(*idx);
178 let value = value.as_ref();
179 buffer.extend_from_slice((value.len() as u32).as_bytes());
180 buffer.extend_from_slice(value);
181 self.variable_length_bytes += value.len() as i64;
182 }
183 }
184 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
185 for idx in indices {
186 let value = values.value(*idx);
187 let value = value.as_ref();
188 lengths.put(&[value.len() as i32]).unwrap();
189 buffer.extend_from_slice(value);
190 self.variable_length_bytes += value.len() as i64;
191 }
192 }
193 FallbackEncoderImpl::Delta {
194 buffer,
195 last_value,
196 prefix_lengths,
197 suffix_lengths,
198 } => {
199 for idx in indices {
200 let value = values.value(*idx);
201 let value = value.as_ref();
202 let mut prefix_length = 0;
203
204 while prefix_length < last_value.len()
205 && prefix_length < value.len()
206 && last_value[prefix_length] == value[prefix_length]
207 {
208 prefix_length += 1;
209 }
210
211 let suffix_length = value.len() - prefix_length;
212
213 last_value.clear();
214 last_value.extend_from_slice(value);
215
216 buffer.extend_from_slice(&value[prefix_length..]);
217 prefix_lengths.put(&[prefix_length as i32]).unwrap();
218 suffix_lengths.put(&[suffix_length as i32]).unwrap();
219 self.variable_length_bytes += value.len() as i64;
220 }
221 }
222 }
223 }
224
225 fn estimated_data_page_size(&self) -> usize {
230 match &self.encoder {
231 FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
232 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
233 buffer.len() + lengths.estimated_data_encoded_size()
234 }
235 FallbackEncoderImpl::Delta {
236 buffer,
237 prefix_lengths,
238 suffix_lengths,
239 ..
240 } => {
241 buffer.len()
242 + prefix_lengths.estimated_data_encoded_size()
243 + suffix_lengths.estimated_data_encoded_size()
244 }
245 }
246 }
247
248 fn flush_data_page(
249 &mut self,
250 min_value: Option<ByteArray>,
251 max_value: Option<ByteArray>,
252 ) -> Result<DataPageValues<ByteArray>> {
253 let (buf, encoding) = match &mut self.encoder {
254 FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
255 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
256 let lengths = lengths.flush_buffer()?;
257
258 let mut out = Vec::with_capacity(lengths.len() + buffer.len());
259 out.extend_from_slice(&lengths);
260 out.extend_from_slice(buffer);
261 buffer.clear();
262 (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
263 }
264 FallbackEncoderImpl::Delta {
265 buffer,
266 prefix_lengths,
267 suffix_lengths,
268 last_value,
269 } => {
270 let prefix_lengths = prefix_lengths.flush_buffer()?;
271 let suffix_lengths = suffix_lengths.flush_buffer()?;
272
273 let mut out =
274 Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
275 out.extend_from_slice(&prefix_lengths);
276 out.extend_from_slice(&suffix_lengths);
277 out.extend_from_slice(buffer);
278 buffer.clear();
279 last_value.clear();
280 (out, Encoding::DELTA_BYTE_ARRAY)
281 }
282 };
283
284 let variable_length_bytes = Some(self.variable_length_bytes);
286 self.variable_length_bytes = 0;
287
288 Ok(DataPageValues {
289 buf: buf.into(),
290 num_values: std::mem::take(&mut self.num_values),
291 encoding,
292 min_value,
293 max_value,
294 variable_length_bytes,
295 })
296 }
297}
298
299#[derive(Debug, Default)]
301struct ByteArrayStorage {
302 page: Vec<u8>,
304
305 values: Vec<std::ops::Range<usize>>,
306}
307
308impl Storage for ByteArrayStorage {
309 type Key = u64;
310 type Value = [u8];
311
312 fn get(&self, idx: Self::Key) -> &Self::Value {
313 &self.page[self.values[idx as usize].clone()]
314 }
315
316 fn push(&mut self, value: &Self::Value) -> Self::Key {
317 let key = self.values.len();
318
319 self.page.reserve(4 + value.len());
320 self.page.extend_from_slice((value.len() as u32).as_bytes());
321
322 let start = self.page.len();
323 self.page.extend_from_slice(value);
324 self.values.push(start..self.page.len());
325
326 key as u64
327 }
328
329 #[allow(dead_code)] fn estimated_memory_size(&self) -> usize {
331 self.page.capacity() * std::mem::size_of::<u8>()
332 + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
333 }
334}
335
336#[derive(Debug, Default)]
338struct DictEncoder {
339 interner: Interner<ByteArrayStorage>,
340 indices: Vec<u64>,
341 variable_length_bytes: i64,
342}
343
344impl DictEncoder {
345 fn encode<T>(&mut self, values: T, indices: &[usize])
347 where
348 T: ArrayAccessor + Copy,
349 T::Item: AsRef<[u8]>,
350 {
351 self.indices.reserve(indices.len());
352
353 for idx in indices {
354 let value = values.value(*idx);
355 let interned = self.interner.intern(value.as_ref());
356 self.indices.push(interned);
357 self.variable_length_bytes += value.as_ref().len() as i64;
358 }
359 }
360
361 fn bit_width(&self) -> u8 {
362 let length = self.interner.storage().values.len();
363 num_required_bits(length.saturating_sub(1) as u64)
364 }
365
366 fn estimated_memory_size(&self) -> usize {
367 self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
368 }
369
370 fn estimated_data_page_size(&self) -> usize {
371 let bit_width = self.bit_width();
372 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
373 }
374
375 fn estimated_dict_page_size(&self) -> usize {
376 self.interner.storage().page.len()
377 }
378
379 fn flush_dict_page(self) -> DictionaryPage {
380 let storage = self.interner.into_inner();
381
382 DictionaryPage {
383 buf: storage.page.into(),
384 num_values: storage.values.len(),
385 is_sorted: false,
386 }
387 }
388
389 fn flush_data_page(
390 &mut self,
391 min_value: Option<ByteArray>,
392 max_value: Option<ByteArray>,
393 ) -> DataPageValues<ByteArray> {
394 let num_values = self.indices.len();
395 let buffer_len = self.estimated_data_page_size();
396 let mut buffer = Vec::with_capacity(buffer_len);
397 buffer.push(self.bit_width());
398
399 let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
400 for index in &self.indices {
401 encoder.put(*index)
402 }
403
404 self.indices.clear();
405
406 let variable_length_bytes = Some(self.variable_length_bytes);
408 self.variable_length_bytes = 0;
409
410 DataPageValues {
411 buf: encoder.consume().into(),
412 num_values,
413 encoding: Encoding::RLE_DICTIONARY,
414 min_value,
415 max_value,
416 variable_length_bytes,
417 }
418 }
419}
420
421pub struct ByteArrayEncoder {
422 fallback: FallbackEncoder,
423 dict_encoder: Option<DictEncoder>,
424 statistics_enabled: EnabledStatistics,
425 min_value: Option<ByteArray>,
426 max_value: Option<ByteArray>,
427 bloom_filter: Option<Sbbf>,
428 bloom_filter_target_fpp: f64,
429 geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
430}
431
432impl ColumnValueEncoder for ByteArrayEncoder {
433 type T = ByteArray;
434 type Values = dyn Array;
435 fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
436 let mut sbbf = self.bloom_filter.take()?;
437 sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
438 Some(sbbf)
439 }
440
441 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
442 where
443 Self: Sized,
444 {
445 let dictionary = props
446 .dictionary_enabled(descr.path())
447 .then(DictEncoder::default);
448
449 let fallback = FallbackEncoder::new(descr, props)?;
450
451 let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
452
453 let statistics_enabled = props.statistics_enabled(descr.path());
454
455 let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
456
457 Ok(Self {
458 fallback,
459 statistics_enabled,
460 bloom_filter,
461 bloom_filter_target_fpp,
462 dict_encoder: dictionary,
463 min_value: None,
464 max_value: None,
465 geo_stats_accumulator,
466 })
467 }
468
469 fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
470 unreachable!("should call write_gather instead")
471 }
472
473 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
474 downcast_op!(values.data_type(), values, encode, indices, self);
475 Ok(())
476 }
477
478 fn num_values(&self) -> usize {
479 match &self.dict_encoder {
480 Some(encoder) => encoder.indices.len(),
481 None => self.fallback.num_values,
482 }
483 }
484
485 fn has_dictionary(&self) -> bool {
486 self.dict_encoder.is_some()
487 }
488
489 fn estimated_memory_size(&self) -> usize {
490 let encoder_size = match &self.dict_encoder {
491 Some(encoder) => encoder.estimated_memory_size(),
492 None => self.fallback.estimated_data_page_size(),
495 };
496
497 let bloom_filter_size = self
498 .bloom_filter
499 .as_ref()
500 .map(|bf| bf.estimated_memory_size())
501 .unwrap_or_default();
502
503 let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
504 + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
505
506 encoder_size + bloom_filter_size + stats_size
507 }
508
509 fn estimated_dict_page_size(&self) -> Option<usize> {
510 Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
511 }
512
513 fn estimated_data_page_size(&self) -> usize {
518 match &self.dict_encoder {
519 Some(encoder) => encoder.estimated_data_page_size(),
520 None => self.fallback.estimated_data_page_size(),
521 }
522 }
523
524 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
525 match self.dict_encoder.take() {
526 Some(encoder) => {
527 if !encoder.indices.is_empty() {
528 return Err(general_err!(
529 "Must flush data pages before flushing dictionary"
530 ));
531 }
532
533 Ok(Some(encoder.flush_dict_page()))
534 }
535 _ => Ok(None),
536 }
537 }
538
539 fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
540 let min_value = self.min_value.take();
541 let max_value = self.max_value.take();
542
543 match &mut self.dict_encoder {
544 Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
545 _ => self.fallback.flush_data_page(min_value, max_value),
546 }
547 }
548
549 fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
550 self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
551 }
552}
553
554fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
558where
559 T: ArrayAccessor + Copy,
560 T::Item: Copy + Ord + AsRef<[u8]>,
561{
562 if encoder.statistics_enabled != EnabledStatistics::None {
563 if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
564 update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned());
565 } else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
566 if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
567 encoder.min_value = Some(min);
568 }
569
570 if encoder.max_value.as_ref().is_none_or(|m| m < &max) {
571 encoder.max_value = Some(max);
572 }
573 }
574 }
575
576 if let Some(bloom_filter) = &mut encoder.bloom_filter {
578 let valid = indices.iter().cloned();
579 for idx in valid {
580 bloom_filter.insert(values.value(idx).as_ref());
581 }
582 }
583
584 match &mut encoder.dict_encoder {
585 Some(dict_encoder) => dict_encoder.encode(values, indices),
586 None => encoder.fallback.encode(values, indices),
587 }
588}
589
590fn compute_min_max<T>(
594 array: T,
595 mut valid: impl Iterator<Item = usize>,
596) -> Option<(ByteArray, ByteArray)>
597where
598 T: ArrayAccessor,
599 T::Item: Copy + Ord + AsRef<[u8]>,
600{
601 let first_idx = valid.next()?;
602
603 let first_val = array.value(first_idx);
604 let mut min = first_val;
605 let mut max = first_val;
606 for idx in valid {
607 let val = array.value(idx);
608 min = min.min(val);
609 max = max.max(val);
610 }
611 Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
612}
613
614fn update_geo_stats_accumulator<T>(
616 bounder: &mut dyn GeoStatsAccumulator,
617 array: T,
618 valid: impl Iterator<Item = usize>,
619) where
620 T: ArrayAccessor,
621 T::Item: Copy + Ord + AsRef<[u8]>,
622{
623 if bounder.is_valid() {
624 for idx in valid {
625 let val = array.value(idx);
626 bounder.update_wkb(val.as_ref());
627 }
628 }
629}