1use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{
21 ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
22};
23use crate::data_type::{AsBytes, ByteArray, Int32Type};
24use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
25use crate::encodings::rle::RleEncoder;
26use crate::errors::{ParquetError, Result};
27use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
28use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
29use crate::geospatial::statistics::GeospatialStatistics;
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::num_required_bits;
32use crate::util::interner::{Interner, Storage};
33use arrow_array::{
34 Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
35 LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
36};
37use arrow_schema::DataType;
38
39macro_rules! downcast_dict_impl {
40 ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
41 $op($array
42 .as_any()
43 .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
44 .unwrap()
45 .downcast_dict::<$val>()
46 .unwrap()$(, $arg)*)
47 }};
48}
49
50macro_rules! downcast_dict_op {
51 ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
52 match $key_type.as_ref() {
53 DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
54 DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
55 DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
56 DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
57 DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
58 DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
59 DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
60 DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
61 _ => unreachable!(),
62 }
63 };
64}
65
66macro_rules! downcast_op {
67 ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
68 match $data_type {
69 DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
70 DataType::LargeUtf8 => {
71 $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
72 }
73 DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
74 DataType::Binary => {
75 $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
76 }
77 DataType::LargeBinary => {
78 $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
79 }
80 DataType::BinaryView => {
81 $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
82 }
83 DataType::Dictionary(key, value) => match value.as_ref() {
84 DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
85 DataType::LargeUtf8 => {
86 downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
87 }
88 DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
89 DataType::LargeBinary => {
90 downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
91 }
92 DataType::FixedSizeBinary(_) => {
93 downcast_dict_op!(key, FixedSizeBinaryArray, $array, $op$(, $arg)*)
94 }
95 d => unreachable!("cannot downcast {} dictionary value to byte array", d),
96 },
97 d => unreachable!("cannot downcast {} to byte array", d),
98 }
99 };
100}
101
102struct FallbackEncoder {
104 encoder: FallbackEncoderImpl,
105 num_values: usize,
106 variable_length_bytes: i64,
107}
108
109enum FallbackEncoderImpl {
113 Plain {
114 buffer: Vec<u8>,
115 },
116 DeltaLength {
117 buffer: Vec<u8>,
118 lengths: Box<DeltaBitPackEncoder<Int32Type>>,
119 },
120 Delta {
121 buffer: Vec<u8>,
122 last_value: Vec<u8>,
123 prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
124 suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
125 },
126}
127
128impl FallbackEncoder {
129 fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
131 let encoding =
133 props
134 .encoding(descr.path())
135 .unwrap_or_else(|| match props.writer_version() {
136 WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
137 WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
138 });
139
140 let encoder = match encoding {
141 Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
142 Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
143 buffer: vec![],
144 lengths: Box::new(DeltaBitPackEncoder::new()),
145 },
146 Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
147 buffer: vec![],
148 last_value: vec![],
149 prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
150 suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
151 },
152 _ => {
153 return Err(general_err!(
154 "unsupported encoding {} for byte array",
155 encoding
156 ));
157 }
158 };
159
160 Ok(Self {
161 encoder,
162 num_values: 0,
163 variable_length_bytes: 0,
164 })
165 }
166
167 fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
169 where
170 T: ArrayAccessor + Copy,
171 T::Item: AsRef<[u8]>,
172 {
173 self.num_values += indices.len();
174 match &mut self.encoder {
175 FallbackEncoderImpl::Plain { buffer } => {
176 for idx in indices {
177 let value = values.value(idx);
178 let value = value.as_ref();
179 buffer.extend_from_slice((value.len() as u32).as_bytes());
180 buffer.extend_from_slice(value);
181 self.variable_length_bytes += value.len() as i64;
182 }
183 }
184 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
185 for idx in indices {
186 let value = values.value(idx);
187 let value = value.as_ref();
188 lengths.put(&[value.len() as i32]).unwrap();
189 buffer.extend_from_slice(value);
190 self.variable_length_bytes += value.len() as i64;
191 }
192 }
193 FallbackEncoderImpl::Delta {
194 buffer,
195 last_value,
196 prefix_lengths,
197 suffix_lengths,
198 } => {
199 for idx in indices {
200 let value = values.value(idx);
201 let value = value.as_ref();
202 let mut prefix_length = 0;
203
204 while prefix_length < last_value.len()
205 && prefix_length < value.len()
206 && last_value[prefix_length] == value[prefix_length]
207 {
208 prefix_length += 1;
209 }
210
211 let suffix_length = value.len() - prefix_length;
212
213 last_value.clear();
214 last_value.extend_from_slice(value);
215
216 buffer.extend_from_slice(&value[prefix_length..]);
217 prefix_lengths.put(&[prefix_length as i32]).unwrap();
218 suffix_lengths.put(&[suffix_length as i32]).unwrap();
219 self.variable_length_bytes += value.len() as i64;
220 }
221 }
222 }
223 }
224
225 fn estimated_data_page_size(&self) -> usize {
230 match &self.encoder {
231 FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
232 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
233 buffer.len() + lengths.estimated_data_encoded_size()
234 }
235 FallbackEncoderImpl::Delta {
236 buffer,
237 prefix_lengths,
238 suffix_lengths,
239 ..
240 } => {
241 buffer.len()
242 + prefix_lengths.estimated_data_encoded_size()
243 + suffix_lengths.estimated_data_encoded_size()
244 }
245 }
246 }
247
248 fn flush_data_page(
249 &mut self,
250 min_value: Option<ByteArray>,
251 max_value: Option<ByteArray>,
252 ) -> Result<DataPageValues<ByteArray>> {
253 let (buf, encoding) = match &mut self.encoder {
254 FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
255 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
256 let lengths = lengths.flush_buffer()?;
257
258 let mut out = Vec::with_capacity(lengths.len() + buffer.len());
259 out.extend_from_slice(&lengths);
260 out.extend_from_slice(buffer);
261 buffer.clear();
262 (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
263 }
264 FallbackEncoderImpl::Delta {
265 buffer,
266 prefix_lengths,
267 suffix_lengths,
268 last_value,
269 } => {
270 let prefix_lengths = prefix_lengths.flush_buffer()?;
271 let suffix_lengths = suffix_lengths.flush_buffer()?;
272
273 let mut out =
274 Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
275 out.extend_from_slice(&prefix_lengths);
276 out.extend_from_slice(&suffix_lengths);
277 out.extend_from_slice(buffer);
278 buffer.clear();
279 last_value.clear();
280 (out, Encoding::DELTA_BYTE_ARRAY)
281 }
282 };
283
284 let variable_length_bytes = Some(self.variable_length_bytes);
286 self.variable_length_bytes = 0;
287
288 Ok(DataPageValues {
289 buf: buf.into(),
290 num_values: std::mem::take(&mut self.num_values),
291 encoding,
292 min_value,
293 max_value,
294 variable_length_bytes,
295 })
296 }
297}
298
299#[derive(Debug, Default)]
301struct ByteArrayStorage {
302 page: Vec<u8>,
304
305 values: Vec<std::ops::Range<usize>>,
306}
307
308impl Storage for ByteArrayStorage {
309 type Key = u64;
310 type Value = [u8];
311
312 fn get(&self, idx: Self::Key) -> &Self::Value {
313 &self.page[self.values[idx as usize].clone()]
314 }
315
316 fn push(&mut self, value: &Self::Value) -> Self::Key {
317 let key = self.values.len();
318
319 self.page.reserve(4 + value.len());
320 self.page.extend_from_slice((value.len() as u32).as_bytes());
321
322 let start = self.page.len();
323 self.page.extend_from_slice(value);
324 self.values.push(start..self.page.len());
325
326 key as u64
327 }
328
329 #[allow(dead_code)] fn estimated_memory_size(&self) -> usize {
331 self.page.capacity() * std::mem::size_of::<u8>()
332 + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
333 }
334}
335
336#[derive(Debug, Default)]
338struct DictEncoder {
339 interner: Interner<ByteArrayStorage>,
340 indices: Vec<u64>,
341 variable_length_bytes: i64,
342}
343
344impl DictEncoder {
345 fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
347 where
348 T: ArrayAccessor + Copy,
349 T::Item: AsRef<[u8]>,
350 {
351 self.indices.reserve(indices.len());
352
353 for idx in indices {
354 let value = values.value(idx);
355 let interned = self.interner.intern(value.as_ref());
356 self.indices.push(interned);
357 self.variable_length_bytes += value.as_ref().len() as i64;
358 }
359 }
360
361 fn bit_width(&self) -> u8 {
362 let length = self.interner.storage().values.len();
363 num_required_bits(length.saturating_sub(1) as u64)
364 }
365
366 fn estimated_memory_size(&self) -> usize {
367 self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
368 }
369
370 fn estimated_data_page_size(&self) -> usize {
371 let bit_width = self.bit_width();
372 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
373 }
374
375 fn estimated_dict_page_size(&self) -> usize {
376 self.interner.storage().page.len()
377 }
378
379 fn flush_dict_page(self) -> DictionaryPage {
380 let storage = self.interner.into_inner();
381
382 DictionaryPage {
383 buf: storage.page.into(),
384 num_values: storage.values.len(),
385 is_sorted: false,
386 }
387 }
388
389 fn flush_data_page(
390 &mut self,
391 min_value: Option<ByteArray>,
392 max_value: Option<ByteArray>,
393 ) -> DataPageValues<ByteArray> {
394 let num_values = self.indices.len();
395 let buffer_len = self.estimated_data_page_size();
396 let mut buffer = Vec::with_capacity(buffer_len);
397 buffer.push(self.bit_width());
398
399 let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
400 for index in &self.indices {
401 encoder.put(*index)
402 }
403
404 self.indices.clear();
405
406 let variable_length_bytes = Some(self.variable_length_bytes);
408 self.variable_length_bytes = 0;
409
410 DataPageValues {
411 buf: encoder.consume().into(),
412 num_values,
413 encoding: Encoding::RLE_DICTIONARY,
414 min_value,
415 max_value,
416 variable_length_bytes,
417 }
418 }
419}
420
421pub struct ByteArrayEncoder {
422 fallback: FallbackEncoder,
423 dict_encoder: Option<DictEncoder>,
424 statistics_enabled: EnabledStatistics,
425 min_value: Option<ByteArray>,
426 max_value: Option<ByteArray>,
427 bloom_filter: Option<Sbbf>,
428 bloom_filter_target_fpp: f64,
429 geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
430}
431
432impl ColumnValueEncoder for ByteArrayEncoder {
433 type T = ByteArray;
434 type Values = dyn Array;
435 fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
436 let mut sbbf = self.bloom_filter.take()?;
437 sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
438 Some(sbbf)
439 }
440
441 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
442 where
443 Self: Sized,
444 {
445 let dictionary = props
446 .dictionary_enabled(descr.path())
447 .then(DictEncoder::default);
448
449 let fallback = FallbackEncoder::new(descr, props)?;
450
451 let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
452
453 let statistics_enabled = props.statistics_enabled(descr.path());
454
455 let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
456
457 Ok(Self {
458 fallback,
459 statistics_enabled,
460 bloom_filter,
461 bloom_filter_target_fpp,
462 dict_encoder: dictionary,
463 min_value: None,
464 max_value: None,
465 geo_stats_accumulator,
466 })
467 }
468
469 fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
470 unreachable!("should call write_gather instead")
471 }
472
473 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
474 downcast_op!(
475 values.data_type(),
476 values,
477 encode,
478 indices.iter().copied(),
479 self
480 );
481 Ok(())
482 }
483
484 fn num_values(&self) -> usize {
485 match &self.dict_encoder {
486 Some(encoder) => encoder.indices.len(),
487 None => self.fallback.num_values,
488 }
489 }
490
491 fn has_dictionary(&self) -> bool {
492 self.dict_encoder.is_some()
493 }
494
495 fn estimated_memory_size(&self) -> usize {
496 let encoder_size = match &self.dict_encoder {
497 Some(encoder) => encoder.estimated_memory_size(),
498 None => self.fallback.estimated_data_page_size(),
501 };
502
503 let bloom_filter_size = self
504 .bloom_filter
505 .as_ref()
506 .map(|bf| bf.estimated_memory_size())
507 .unwrap_or_default();
508
509 let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
510 + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
511
512 encoder_size + bloom_filter_size + stats_size
513 }
514
515 fn estimated_dict_page_size(&self) -> Option<usize> {
516 Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
517 }
518
519 fn estimated_data_page_size(&self) -> usize {
524 match &self.dict_encoder {
525 Some(encoder) => encoder.estimated_data_page_size(),
526 None => self.fallback.estimated_data_page_size(),
527 }
528 }
529
530 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
531 match self.dict_encoder.take() {
532 Some(encoder) => {
533 if !encoder.indices.is_empty() {
534 return Err(general_err!(
535 "Must flush data pages before flushing dictionary"
536 ));
537 }
538
539 Ok(Some(encoder.flush_dict_page()))
540 }
541 _ => Ok(None),
542 }
543 }
544
545 fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
546 let min_value = self.min_value.take();
547 let max_value = self.max_value.take();
548
549 match &mut self.dict_encoder {
550 Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
551 _ => self.fallback.flush_data_page(min_value, max_value),
552 }
553 }
554
555 fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
556 self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
557 }
558}
559
560fn encode<T, I>(values: T, indices: I, encoder: &mut ByteArrayEncoder)
564where
565 T: ArrayAccessor + Copy,
566 T::Item: Copy + Ord + AsRef<[u8]>,
567 I: ExactSizeIterator<Item = usize> + Clone,
568{
569 if encoder.statistics_enabled != EnabledStatistics::None {
570 if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
571 update_geo_stats_accumulator(accumulator.as_mut(), values, indices.clone());
572 } else if let Some((min, max)) = compute_min_max(values, indices.clone()) {
573 if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
574 encoder.min_value = Some(min);
575 }
576
577 if encoder.max_value.as_ref().is_none_or(|m| m < &max) {
578 encoder.max_value = Some(max);
579 }
580 }
581 }
582
583 if let Some(bloom_filter) = &mut encoder.bloom_filter {
585 for idx in indices.clone() {
586 bloom_filter.insert(values.value(idx).as_ref());
587 }
588 }
589
590 match &mut encoder.dict_encoder {
591 Some(dict_encoder) => dict_encoder.encode(values, indices),
592 None => encoder.fallback.encode(values, indices),
593 }
594}
595
596fn compute_min_max<T>(
600 array: T,
601 mut valid: impl Iterator<Item = usize>,
602) -> Option<(ByteArray, ByteArray)>
603where
604 T: ArrayAccessor,
605 T::Item: Copy + Ord + AsRef<[u8]>,
606{
607 let first_idx = valid.next()?;
608
609 let first_val = array.value(first_idx);
610 let mut min = first_val;
611 let mut max = first_val;
612 for idx in valid {
613 let val = array.value(idx);
614 min = min.min(val);
615 max = max.max(val);
616 }
617 Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
618}
619
620fn update_geo_stats_accumulator<T>(
622 bounder: &mut dyn GeoStatsAccumulator,
623 array: T,
624 valid: impl Iterator<Item = usize>,
625) where
626 T: ArrayAccessor,
627 T::Item: Copy + Ord + AsRef<[u8]>,
628{
629 if bounder.is_valid() {
630 for idx in valid {
631 let val = array.value(idx);
632 bounder.update_wkb(val.as_ref());
633 }
634 }
635}