1use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
21use crate::data_type::{AsBytes, ByteArray, Int32Type};
22use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
23use crate::encodings::rle::RleEncoder;
24use crate::errors::{ParquetError, Result};
25use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
26use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
27use crate::geospatial::statistics::GeospatialStatistics;
28use crate::schema::types::ColumnDescPtr;
29use crate::util::bit_util::num_required_bits;
30use crate::util::interner::{Interner, Storage};
31use arrow_array::{
32 Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
33 LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
34};
35use arrow_schema::DataType;
36
37macro_rules! downcast_dict_impl {
38 ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
39 $op($array
40 .as_any()
41 .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
42 .unwrap()
43 .downcast_dict::<$val>()
44 .unwrap()$(, $arg)*)
45 }};
46}
47
48macro_rules! downcast_dict_op {
49 ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
50 match $key_type.as_ref() {
51 DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
52 DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
53 DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
54 DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
55 DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
56 DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
57 DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
58 DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
59 _ => unreachable!(),
60 }
61 };
62}
63
64macro_rules! downcast_op {
65 ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
66 match $data_type {
67 DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
68 DataType::LargeUtf8 => {
69 $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
70 }
71 DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
72 DataType::Binary => {
73 $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
74 }
75 DataType::LargeBinary => {
76 $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
77 }
78 DataType::BinaryView => {
79 $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
80 }
81 DataType::Dictionary(key, value) => match value.as_ref() {
82 DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
83 DataType::LargeUtf8 => {
84 downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
85 }
86 DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
87 DataType::LargeBinary => {
88 downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
89 }
90 DataType::FixedSizeBinary(_) => {
91 downcast_dict_op!(key, FixedSizeBinaryArray, $array, $op$(, $arg)*)
92 }
93 d => unreachable!("cannot downcast {} dictionary value to byte array", d),
94 },
95 d => unreachable!("cannot downcast {} to byte array", d),
96 }
97 };
98}
99
100struct FallbackEncoder {
102 encoder: FallbackEncoderImpl,
103 num_values: usize,
104 variable_length_bytes: i64,
105}
106
107enum FallbackEncoderImpl {
111 Plain {
112 buffer: Vec<u8>,
113 },
114 DeltaLength {
115 buffer: Vec<u8>,
116 lengths: Box<DeltaBitPackEncoder<Int32Type>>,
117 },
118 Delta {
119 buffer: Vec<u8>,
120 last_value: Vec<u8>,
121 prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
122 suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
123 },
124}
125
126impl FallbackEncoder {
127 fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
129 let encoding =
131 props
132 .encoding(descr.path())
133 .unwrap_or_else(|| match props.writer_version() {
134 WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
135 WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
136 });
137
138 let encoder = match encoding {
139 Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
140 Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
141 buffer: vec![],
142 lengths: Box::new(DeltaBitPackEncoder::new()),
143 },
144 Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
145 buffer: vec![],
146 last_value: vec![],
147 prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
148 suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
149 },
150 _ => {
151 return Err(general_err!(
152 "unsupported encoding {} for byte array",
153 encoding
154 ));
155 }
156 };
157
158 Ok(Self {
159 encoder,
160 num_values: 0,
161 variable_length_bytes: 0,
162 })
163 }
164
165 fn encode<T>(&mut self, values: T, indices: &[usize])
167 where
168 T: ArrayAccessor + Copy,
169 T::Item: AsRef<[u8]>,
170 {
171 self.num_values += indices.len();
172 match &mut self.encoder {
173 FallbackEncoderImpl::Plain { buffer } => {
174 for idx in indices {
175 let value = values.value(*idx);
176 let value = value.as_ref();
177 buffer.extend_from_slice((value.len() as u32).as_bytes());
178 buffer.extend_from_slice(value);
179 self.variable_length_bytes += value.len() as i64;
180 }
181 }
182 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
183 for idx in indices {
184 let value = values.value(*idx);
185 let value = value.as_ref();
186 lengths.put(&[value.len() as i32]).unwrap();
187 buffer.extend_from_slice(value);
188 self.variable_length_bytes += value.len() as i64;
189 }
190 }
191 FallbackEncoderImpl::Delta {
192 buffer,
193 last_value,
194 prefix_lengths,
195 suffix_lengths,
196 } => {
197 for idx in indices {
198 let value = values.value(*idx);
199 let value = value.as_ref();
200 let mut prefix_length = 0;
201
202 while prefix_length < last_value.len()
203 && prefix_length < value.len()
204 && last_value[prefix_length] == value[prefix_length]
205 {
206 prefix_length += 1;
207 }
208
209 let suffix_length = value.len() - prefix_length;
210
211 last_value.clear();
212 last_value.extend_from_slice(value);
213
214 buffer.extend_from_slice(&value[prefix_length..]);
215 prefix_lengths.put(&[prefix_length as i32]).unwrap();
216 suffix_lengths.put(&[suffix_length as i32]).unwrap();
217 self.variable_length_bytes += value.len() as i64;
218 }
219 }
220 }
221 }
222
223 fn estimated_data_page_size(&self) -> usize {
228 match &self.encoder {
229 FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
230 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
231 buffer.len() + lengths.estimated_data_encoded_size()
232 }
233 FallbackEncoderImpl::Delta {
234 buffer,
235 prefix_lengths,
236 suffix_lengths,
237 ..
238 } => {
239 buffer.len()
240 + prefix_lengths.estimated_data_encoded_size()
241 + suffix_lengths.estimated_data_encoded_size()
242 }
243 }
244 }
245
246 fn flush_data_page(
247 &mut self,
248 min_value: Option<ByteArray>,
249 max_value: Option<ByteArray>,
250 ) -> Result<DataPageValues<ByteArray>> {
251 let (buf, encoding) = match &mut self.encoder {
252 FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
253 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
254 let lengths = lengths.flush_buffer()?;
255
256 let mut out = Vec::with_capacity(lengths.len() + buffer.len());
257 out.extend_from_slice(&lengths);
258 out.extend_from_slice(buffer);
259 buffer.clear();
260 (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
261 }
262 FallbackEncoderImpl::Delta {
263 buffer,
264 prefix_lengths,
265 suffix_lengths,
266 last_value,
267 } => {
268 let prefix_lengths = prefix_lengths.flush_buffer()?;
269 let suffix_lengths = suffix_lengths.flush_buffer()?;
270
271 let mut out =
272 Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
273 out.extend_from_slice(&prefix_lengths);
274 out.extend_from_slice(&suffix_lengths);
275 out.extend_from_slice(buffer);
276 buffer.clear();
277 last_value.clear();
278 (out, Encoding::DELTA_BYTE_ARRAY)
279 }
280 };
281
282 let variable_length_bytes = Some(self.variable_length_bytes);
284 self.variable_length_bytes = 0;
285
286 Ok(DataPageValues {
287 buf: buf.into(),
288 num_values: std::mem::take(&mut self.num_values),
289 encoding,
290 min_value,
291 max_value,
292 variable_length_bytes,
293 })
294 }
295}
296
297#[derive(Debug, Default)]
299struct ByteArrayStorage {
300 page: Vec<u8>,
302
303 values: Vec<std::ops::Range<usize>>,
304}
305
306impl Storage for ByteArrayStorage {
307 type Key = u64;
308 type Value = [u8];
309
310 fn get(&self, idx: Self::Key) -> &Self::Value {
311 &self.page[self.values[idx as usize].clone()]
312 }
313
314 fn push(&mut self, value: &Self::Value) -> Self::Key {
315 let key = self.values.len();
316
317 self.page.reserve(4 + value.len());
318 self.page.extend_from_slice((value.len() as u32).as_bytes());
319
320 let start = self.page.len();
321 self.page.extend_from_slice(value);
322 self.values.push(start..self.page.len());
323
324 key as u64
325 }
326
327 #[allow(dead_code)] fn estimated_memory_size(&self) -> usize {
329 self.page.capacity() * std::mem::size_of::<u8>()
330 + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
331 }
332}
333
334#[derive(Debug, Default)]
336struct DictEncoder {
337 interner: Interner<ByteArrayStorage>,
338 indices: Vec<u64>,
339 variable_length_bytes: i64,
340}
341
342impl DictEncoder {
343 fn encode<T>(&mut self, values: T, indices: &[usize])
345 where
346 T: ArrayAccessor + Copy,
347 T::Item: AsRef<[u8]>,
348 {
349 self.indices.reserve(indices.len());
350
351 for idx in indices {
352 let value = values.value(*idx);
353 let interned = self.interner.intern(value.as_ref());
354 self.indices.push(interned);
355 self.variable_length_bytes += value.as_ref().len() as i64;
356 }
357 }
358
359 fn bit_width(&self) -> u8 {
360 let length = self.interner.storage().values.len();
361 num_required_bits(length.saturating_sub(1) as u64)
362 }
363
364 fn estimated_memory_size(&self) -> usize {
365 self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
366 }
367
368 fn estimated_data_page_size(&self) -> usize {
369 let bit_width = self.bit_width();
370 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
371 }
372
373 fn estimated_dict_page_size(&self) -> usize {
374 self.interner.storage().page.len()
375 }
376
377 fn flush_dict_page(self) -> DictionaryPage {
378 let storage = self.interner.into_inner();
379
380 DictionaryPage {
381 buf: storage.page.into(),
382 num_values: storage.values.len(),
383 is_sorted: false,
384 }
385 }
386
387 fn flush_data_page(
388 &mut self,
389 min_value: Option<ByteArray>,
390 max_value: Option<ByteArray>,
391 ) -> DataPageValues<ByteArray> {
392 let num_values = self.indices.len();
393 let buffer_len = self.estimated_data_page_size();
394 let mut buffer = Vec::with_capacity(buffer_len);
395 buffer.push(self.bit_width());
396
397 let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
398 for index in &self.indices {
399 encoder.put(*index)
400 }
401
402 self.indices.clear();
403
404 let variable_length_bytes = Some(self.variable_length_bytes);
406 self.variable_length_bytes = 0;
407
408 DataPageValues {
409 buf: encoder.consume().into(),
410 num_values,
411 encoding: Encoding::RLE_DICTIONARY,
412 min_value,
413 max_value,
414 variable_length_bytes,
415 }
416 }
417}
418
419pub struct ByteArrayEncoder {
420 fallback: FallbackEncoder,
421 dict_encoder: Option<DictEncoder>,
422 statistics_enabled: EnabledStatistics,
423 min_value: Option<ByteArray>,
424 max_value: Option<ByteArray>,
425 bloom_filter: Option<Sbbf>,
426 geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
427}
428
429impl ColumnValueEncoder for ByteArrayEncoder {
430 type T = ByteArray;
431 type Values = dyn Array;
432 fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
433 self.bloom_filter.take()
434 }
435
436 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
437 where
438 Self: Sized,
439 {
440 let dictionary = props
441 .dictionary_enabled(descr.path())
442 .then(DictEncoder::default);
443
444 let fallback = FallbackEncoder::new(descr, props)?;
445
446 let bloom_filter = props
447 .bloom_filter_properties(descr.path())
448 .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
449 .transpose()?;
450
451 let statistics_enabled = props.statistics_enabled(descr.path());
452
453 let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
454
455 Ok(Self {
456 fallback,
457 statistics_enabled,
458 bloom_filter,
459 dict_encoder: dictionary,
460 min_value: None,
461 max_value: None,
462 geo_stats_accumulator,
463 })
464 }
465
466 fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
467 unreachable!("should call write_gather instead")
468 }
469
470 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
471 downcast_op!(values.data_type(), values, encode, indices, self);
472 Ok(())
473 }
474
475 fn num_values(&self) -> usize {
476 match &self.dict_encoder {
477 Some(encoder) => encoder.indices.len(),
478 None => self.fallback.num_values,
479 }
480 }
481
482 fn has_dictionary(&self) -> bool {
483 self.dict_encoder.is_some()
484 }
485
486 fn estimated_memory_size(&self) -> usize {
487 let encoder_size = match &self.dict_encoder {
488 Some(encoder) => encoder.estimated_memory_size(),
489 None => self.fallback.estimated_data_page_size(),
492 };
493
494 let bloom_filter_size = self
495 .bloom_filter
496 .as_ref()
497 .map(|bf| bf.estimated_memory_size())
498 .unwrap_or_default();
499
500 let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
501 + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
502
503 encoder_size + bloom_filter_size + stats_size
504 }
505
506 fn estimated_dict_page_size(&self) -> Option<usize> {
507 Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
508 }
509
510 fn estimated_data_page_size(&self) -> usize {
515 match &self.dict_encoder {
516 Some(encoder) => encoder.estimated_data_page_size(),
517 None => self.fallback.estimated_data_page_size(),
518 }
519 }
520
521 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
522 match self.dict_encoder.take() {
523 Some(encoder) => {
524 if !encoder.indices.is_empty() {
525 return Err(general_err!(
526 "Must flush data pages before flushing dictionary"
527 ));
528 }
529
530 Ok(Some(encoder.flush_dict_page()))
531 }
532 _ => Ok(None),
533 }
534 }
535
536 fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
537 let min_value = self.min_value.take();
538 let max_value = self.max_value.take();
539
540 match &mut self.dict_encoder {
541 Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
542 _ => self.fallback.flush_data_page(min_value, max_value),
543 }
544 }
545
546 fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
547 self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
548 }
549}
550
551fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
555where
556 T: ArrayAccessor + Copy,
557 T::Item: Copy + Ord + AsRef<[u8]>,
558{
559 if encoder.statistics_enabled != EnabledStatistics::None {
560 if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
561 update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned());
562 } else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
563 if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
564 encoder.min_value = Some(min);
565 }
566
567 if encoder.max_value.as_ref().is_none_or(|m| m < &max) {
568 encoder.max_value = Some(max);
569 }
570 }
571 }
572
573 if let Some(bloom_filter) = &mut encoder.bloom_filter {
575 let valid = indices.iter().cloned();
576 for idx in valid {
577 bloom_filter.insert(values.value(idx).as_ref());
578 }
579 }
580
581 match &mut encoder.dict_encoder {
582 Some(dict_encoder) => dict_encoder.encode(values, indices),
583 None => encoder.fallback.encode(values, indices),
584 }
585}
586
587fn compute_min_max<T>(
591 array: T,
592 mut valid: impl Iterator<Item = usize>,
593) -> Option<(ByteArray, ByteArray)>
594where
595 T: ArrayAccessor,
596 T::Item: Copy + Ord + AsRef<[u8]>,
597{
598 let first_idx = valid.next()?;
599
600 let first_val = array.value(first_idx);
601 let mut min = first_val;
602 let mut max = first_val;
603 for idx in valid {
604 let val = array.value(idx);
605 min = min.min(val);
606 max = max.max(val);
607 }
608 Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
609}
610
611fn update_geo_stats_accumulator<T>(
613 bounder: &mut dyn GeoStatsAccumulator,
614 array: T,
615 valid: impl Iterator<Item = usize>,
616) where
617 T: ArrayAccessor,
618 T::Item: Copy + Ord + AsRef<[u8]>,
619{
620 if bounder.is_valid() {
621 for idx in valid {
622 let val = array.value(idx);
623 bounder.update_wkb(val.as_ref());
624 }
625 }
626}