1use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
21use crate::data_type::{AsBytes, ByteArray, Int32Type};
22use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
23use crate::encodings::rle::RleEncoder;
24use crate::errors::{ParquetError, Result};
25use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
26use crate::schema::types::ColumnDescPtr;
27use crate::util::bit_util::num_required_bits;
28use crate::util::interner::{Interner, Storage};
29use arrow_array::{
30 Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
31 LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
32};
33use arrow_schema::DataType;
34
35macro_rules! downcast_dict_impl {
36 ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
37 $op($array
38 .as_any()
39 .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
40 .unwrap()
41 .downcast_dict::<$val>()
42 .unwrap()$(, $arg)*)
43 }};
44}
45
46macro_rules! downcast_dict_op {
47 ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
48 match $key_type.as_ref() {
49 DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
50 DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
51 DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
52 DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
53 DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
54 DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
55 DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
56 DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
57 _ => unreachable!(),
58 }
59 };
60}
61
62macro_rules! downcast_op {
63 ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
64 match $data_type {
65 DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
66 DataType::LargeUtf8 => {
67 $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
68 }
69 DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
70 DataType::Binary => {
71 $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
72 }
73 DataType::LargeBinary => {
74 $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
75 }
76 DataType::BinaryView => {
77 $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
78 }
79 DataType::Dictionary(key, value) => match value.as_ref() {
80 DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
81 DataType::LargeUtf8 => {
82 downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
83 }
84 DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
85 DataType::LargeBinary => {
86 downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
87 }
88 DataType::FixedSizeBinary(_) => {
89 downcast_dict_op!(key, FixedSizeBinaryArray, $array, $op$(, $arg)*)
90 }
91 d => unreachable!("cannot downcast {} dictionary value to byte array", d),
92 },
93 d => unreachable!("cannot downcast {} to byte array", d),
94 }
95 };
96}
97
98struct FallbackEncoder {
100 encoder: FallbackEncoderImpl,
101 num_values: usize,
102 variable_length_bytes: i64,
103}
104
105enum FallbackEncoderImpl {
109 Plain {
110 buffer: Vec<u8>,
111 },
112 DeltaLength {
113 buffer: Vec<u8>,
114 lengths: Box<DeltaBitPackEncoder<Int32Type>>,
115 },
116 Delta {
117 buffer: Vec<u8>,
118 last_value: Vec<u8>,
119 prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
120 suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
121 },
122}
123
124impl FallbackEncoder {
125 fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
127 let encoding =
129 props
130 .encoding(descr.path())
131 .unwrap_or_else(|| match props.writer_version() {
132 WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
133 WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
134 });
135
136 let encoder = match encoding {
137 Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
138 Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
139 buffer: vec![],
140 lengths: Box::new(DeltaBitPackEncoder::new()),
141 },
142 Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
143 buffer: vec![],
144 last_value: vec![],
145 prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
146 suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
147 },
148 _ => {
149 return Err(general_err!(
150 "unsupported encoding {} for byte array",
151 encoding
152 ))
153 }
154 };
155
156 Ok(Self {
157 encoder,
158 num_values: 0,
159 variable_length_bytes: 0,
160 })
161 }
162
163 fn encode<T>(&mut self, values: T, indices: &[usize])
165 where
166 T: ArrayAccessor + Copy,
167 T::Item: AsRef<[u8]>,
168 {
169 self.num_values += indices.len();
170 match &mut self.encoder {
171 FallbackEncoderImpl::Plain { buffer } => {
172 for idx in indices {
173 let value = values.value(*idx);
174 let value = value.as_ref();
175 buffer.extend_from_slice((value.len() as u32).as_bytes());
176 buffer.extend_from_slice(value);
177 self.variable_length_bytes += value.len() as i64;
178 }
179 }
180 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
181 for idx in indices {
182 let value = values.value(*idx);
183 let value = value.as_ref();
184 lengths.put(&[value.len() as i32]).unwrap();
185 buffer.extend_from_slice(value);
186 self.variable_length_bytes += value.len() as i64;
187 }
188 }
189 FallbackEncoderImpl::Delta {
190 buffer,
191 last_value,
192 prefix_lengths,
193 suffix_lengths,
194 } => {
195 for idx in indices {
196 let value = values.value(*idx);
197 let value = value.as_ref();
198 let mut prefix_length = 0;
199
200 while prefix_length < last_value.len()
201 && prefix_length < value.len()
202 && last_value[prefix_length] == value[prefix_length]
203 {
204 prefix_length += 1;
205 }
206
207 let suffix_length = value.len() - prefix_length;
208
209 last_value.clear();
210 last_value.extend_from_slice(value);
211
212 buffer.extend_from_slice(&value[prefix_length..]);
213 prefix_lengths.put(&[prefix_length as i32]).unwrap();
214 suffix_lengths.put(&[suffix_length as i32]).unwrap();
215 self.variable_length_bytes += value.len() as i64;
216 }
217 }
218 }
219 }
220
221 fn estimated_data_page_size(&self) -> usize {
226 match &self.encoder {
227 FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
228 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
229 buffer.len() + lengths.estimated_data_encoded_size()
230 }
231 FallbackEncoderImpl::Delta {
232 buffer,
233 prefix_lengths,
234 suffix_lengths,
235 ..
236 } => {
237 buffer.len()
238 + prefix_lengths.estimated_data_encoded_size()
239 + suffix_lengths.estimated_data_encoded_size()
240 }
241 }
242 }
243
244 fn flush_data_page(
245 &mut self,
246 min_value: Option<ByteArray>,
247 max_value: Option<ByteArray>,
248 ) -> Result<DataPageValues<ByteArray>> {
249 let (buf, encoding) = match &mut self.encoder {
250 FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
251 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
252 let lengths = lengths.flush_buffer()?;
253
254 let mut out = Vec::with_capacity(lengths.len() + buffer.len());
255 out.extend_from_slice(&lengths);
256 out.extend_from_slice(buffer);
257 buffer.clear();
258 (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
259 }
260 FallbackEncoderImpl::Delta {
261 buffer,
262 prefix_lengths,
263 suffix_lengths,
264 last_value,
265 } => {
266 let prefix_lengths = prefix_lengths.flush_buffer()?;
267 let suffix_lengths = suffix_lengths.flush_buffer()?;
268
269 let mut out =
270 Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
271 out.extend_from_slice(&prefix_lengths);
272 out.extend_from_slice(&suffix_lengths);
273 out.extend_from_slice(buffer);
274 buffer.clear();
275 last_value.clear();
276 (out, Encoding::DELTA_BYTE_ARRAY)
277 }
278 };
279
280 let variable_length_bytes = Some(self.variable_length_bytes);
282 self.variable_length_bytes = 0;
283
284 Ok(DataPageValues {
285 buf: buf.into(),
286 num_values: std::mem::take(&mut self.num_values),
287 encoding,
288 min_value,
289 max_value,
290 variable_length_bytes,
291 })
292 }
293}
294
295#[derive(Debug, Default)]
297struct ByteArrayStorage {
298 page: Vec<u8>,
300
301 values: Vec<std::ops::Range<usize>>,
302}
303
304impl Storage for ByteArrayStorage {
305 type Key = u64;
306 type Value = [u8];
307
308 fn get(&self, idx: Self::Key) -> &Self::Value {
309 &self.page[self.values[idx as usize].clone()]
310 }
311
312 fn push(&mut self, value: &Self::Value) -> Self::Key {
313 let key = self.values.len();
314
315 self.page.reserve(4 + value.len());
316 self.page.extend_from_slice((value.len() as u32).as_bytes());
317
318 let start = self.page.len();
319 self.page.extend_from_slice(value);
320 self.values.push(start..self.page.len());
321
322 key as u64
323 }
324
325 #[allow(dead_code)] fn estimated_memory_size(&self) -> usize {
327 self.page.capacity() * std::mem::size_of::<u8>()
328 + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
329 }
330}
331
332#[derive(Debug, Default)]
334struct DictEncoder {
335 interner: Interner<ByteArrayStorage>,
336 indices: Vec<u64>,
337 variable_length_bytes: i64,
338}
339
340impl DictEncoder {
341 fn encode<T>(&mut self, values: T, indices: &[usize])
343 where
344 T: ArrayAccessor + Copy,
345 T::Item: AsRef<[u8]>,
346 {
347 self.indices.reserve(indices.len());
348
349 for idx in indices {
350 let value = values.value(*idx);
351 let interned = self.interner.intern(value.as_ref());
352 self.indices.push(interned);
353 self.variable_length_bytes += value.as_ref().len() as i64;
354 }
355 }
356
357 fn bit_width(&self) -> u8 {
358 let length = self.interner.storage().values.len();
359 num_required_bits(length.saturating_sub(1) as u64)
360 }
361
362 fn estimated_memory_size(&self) -> usize {
363 self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
364 }
365
366 fn estimated_data_page_size(&self) -> usize {
367 let bit_width = self.bit_width();
368 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
369 }
370
371 fn estimated_dict_page_size(&self) -> usize {
372 self.interner.storage().page.len()
373 }
374
375 fn flush_dict_page(self) -> DictionaryPage {
376 let storage = self.interner.into_inner();
377
378 DictionaryPage {
379 buf: storage.page.into(),
380 num_values: storage.values.len(),
381 is_sorted: false,
382 }
383 }
384
385 fn flush_data_page(
386 &mut self,
387 min_value: Option<ByteArray>,
388 max_value: Option<ByteArray>,
389 ) -> DataPageValues<ByteArray> {
390 let num_values = self.indices.len();
391 let buffer_len = self.estimated_data_page_size();
392 let mut buffer = Vec::with_capacity(buffer_len);
393 buffer.push(self.bit_width());
394
395 let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
396 for index in &self.indices {
397 encoder.put(*index)
398 }
399
400 self.indices.clear();
401
402 let variable_length_bytes = Some(self.variable_length_bytes);
404 self.variable_length_bytes = 0;
405
406 DataPageValues {
407 buf: encoder.consume().into(),
408 num_values,
409 encoding: Encoding::RLE_DICTIONARY,
410 min_value,
411 max_value,
412 variable_length_bytes,
413 }
414 }
415}
416
417pub struct ByteArrayEncoder {
418 fallback: FallbackEncoder,
419 dict_encoder: Option<DictEncoder>,
420 statistics_enabled: EnabledStatistics,
421 min_value: Option<ByteArray>,
422 max_value: Option<ByteArray>,
423 bloom_filter: Option<Sbbf>,
424}
425
426impl ColumnValueEncoder for ByteArrayEncoder {
427 type T = ByteArray;
428 type Values = dyn Array;
429 fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
430 self.bloom_filter.take()
431 }
432
433 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
434 where
435 Self: Sized,
436 {
437 let dictionary = props
438 .dictionary_enabled(descr.path())
439 .then(DictEncoder::default);
440
441 let fallback = FallbackEncoder::new(descr, props)?;
442
443 let bloom_filter = props
444 .bloom_filter_properties(descr.path())
445 .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
446 .transpose()?;
447
448 let statistics_enabled = props.statistics_enabled(descr.path());
449
450 Ok(Self {
451 fallback,
452 statistics_enabled,
453 bloom_filter,
454 dict_encoder: dictionary,
455 min_value: None,
456 max_value: None,
457 })
458 }
459
460 fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
461 unreachable!("should call write_gather instead")
462 }
463
464 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
465 downcast_op!(values.data_type(), values, encode, indices, self);
466 Ok(())
467 }
468
469 fn num_values(&self) -> usize {
470 match &self.dict_encoder {
471 Some(encoder) => encoder.indices.len(),
472 None => self.fallback.num_values,
473 }
474 }
475
476 fn has_dictionary(&self) -> bool {
477 self.dict_encoder.is_some()
478 }
479
480 fn estimated_memory_size(&self) -> usize {
481 let encoder_size = match &self.dict_encoder {
482 Some(encoder) => encoder.estimated_memory_size(),
483 None => self.fallback.estimated_data_page_size(),
486 };
487
488 let bloom_filter_size = self
489 .bloom_filter
490 .as_ref()
491 .map(|bf| bf.estimated_memory_size())
492 .unwrap_or_default();
493
494 let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
495 + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
496
497 encoder_size + bloom_filter_size + stats_size
498 }
499
500 fn estimated_dict_page_size(&self) -> Option<usize> {
501 Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
502 }
503
504 fn estimated_data_page_size(&self) -> usize {
509 match &self.dict_encoder {
510 Some(encoder) => encoder.estimated_data_page_size(),
511 None => self.fallback.estimated_data_page_size(),
512 }
513 }
514
515 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
516 match self.dict_encoder.take() {
517 Some(encoder) => {
518 if !encoder.indices.is_empty() {
519 return Err(general_err!(
520 "Must flush data pages before flushing dictionary"
521 ));
522 }
523
524 Ok(Some(encoder.flush_dict_page()))
525 }
526 _ => Ok(None),
527 }
528 }
529
530 fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
531 let min_value = self.min_value.take();
532 let max_value = self.max_value.take();
533
534 match &mut self.dict_encoder {
535 Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
536 _ => self.fallback.flush_data_page(min_value, max_value),
537 }
538 }
539}
540
541fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
545where
546 T: ArrayAccessor + Copy,
547 T::Item: Copy + Ord + AsRef<[u8]>,
548{
549 if encoder.statistics_enabled != EnabledStatistics::None {
550 if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
551 if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
552 encoder.min_value = Some(min);
553 }
554
555 if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
556 encoder.max_value = Some(max);
557 }
558 }
559 }
560
561 if let Some(bloom_filter) = &mut encoder.bloom_filter {
563 let valid = indices.iter().cloned();
564 for idx in valid {
565 bloom_filter.insert(values.value(idx).as_ref());
566 }
567 }
568
569 match &mut encoder.dict_encoder {
570 Some(dict_encoder) => dict_encoder.encode(values, indices),
571 None => encoder.fallback.encode(values, indices),
572 }
573}
574
575fn compute_min_max<T>(
579 array: T,
580 mut valid: impl Iterator<Item = usize>,
581) -> Option<(ByteArray, ByteArray)>
582where
583 T: ArrayAccessor,
584 T::Item: Copy + Ord + AsRef<[u8]>,
585{
586 let first_idx = valid.next()?;
587
588 let first_val = array.value(first_idx);
589 let mut min = first_val;
590 let mut max = first_val;
591 for idx in valid {
592 let val = array.value(idx);
593 min = min.min(val);
594 max = max.max(val);
595 }
596 Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
597}