1use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
21use crate::data_type::{AsBytes, ByteArray, Int32Type};
22use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
23use crate::encodings::rle::RleEncoder;
24use crate::errors::{ParquetError, Result};
25use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
26use crate::schema::types::ColumnDescPtr;
27use crate::util::bit_util::num_required_bits;
28use crate::util::interner::{Interner, Storage};
29use arrow_array::{
30 Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, LargeBinaryArray,
31 LargeStringArray, StringArray, StringViewArray,
32};
33use arrow_schema::DataType;
34
35macro_rules! downcast_dict_impl {
36 ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
37 $op($array
38 .as_any()
39 .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
40 .unwrap()
41 .downcast_dict::<$val>()
42 .unwrap()$(, $arg)*)
43 }};
44}
45
46macro_rules! downcast_dict_op {
47 ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
48 match $key_type.as_ref() {
49 DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
50 DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
51 DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
52 DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
53 DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
54 DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
55 DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
56 DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
57 _ => unreachable!(),
58 }
59 };
60}
61
62macro_rules! downcast_op {
63 ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
64 match $data_type {
65 DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
66 DataType::LargeUtf8 => {
67 $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
68 }
69 DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
70 DataType::Binary => {
71 $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
72 }
73 DataType::LargeBinary => {
74 $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
75 }
76 DataType::BinaryView => {
77 $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
78 }
79 DataType::Dictionary(key, value) => match value.as_ref() {
80 DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
81 DataType::LargeUtf8 => {
82 downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
83 }
84 DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
85 DataType::LargeBinary => {
86 downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
87 }
88 d => unreachable!("cannot downcast {} dictionary value to byte array", d),
89 },
90 d => unreachable!("cannot downcast {} to byte array", d),
91 }
92 };
93}
94
95struct FallbackEncoder {
97 encoder: FallbackEncoderImpl,
98 num_values: usize,
99 variable_length_bytes: i64,
100}
101
102enum FallbackEncoderImpl {
106 Plain {
107 buffer: Vec<u8>,
108 },
109 DeltaLength {
110 buffer: Vec<u8>,
111 lengths: Box<DeltaBitPackEncoder<Int32Type>>,
112 },
113 Delta {
114 buffer: Vec<u8>,
115 last_value: Vec<u8>,
116 prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
117 suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
118 },
119}
120
121impl FallbackEncoder {
122 fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
124 let encoding =
126 props
127 .encoding(descr.path())
128 .unwrap_or_else(|| match props.writer_version() {
129 WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
130 WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
131 });
132
133 let encoder = match encoding {
134 Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
135 Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
136 buffer: vec![],
137 lengths: Box::new(DeltaBitPackEncoder::new()),
138 },
139 Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
140 buffer: vec![],
141 last_value: vec![],
142 prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
143 suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
144 },
145 _ => {
146 return Err(general_err!(
147 "unsupported encoding {} for byte array",
148 encoding
149 ))
150 }
151 };
152
153 Ok(Self {
154 encoder,
155 num_values: 0,
156 variable_length_bytes: 0,
157 })
158 }
159
160 fn encode<T>(&mut self, values: T, indices: &[usize])
162 where
163 T: ArrayAccessor + Copy,
164 T::Item: AsRef<[u8]>,
165 {
166 self.num_values += indices.len();
167 match &mut self.encoder {
168 FallbackEncoderImpl::Plain { buffer } => {
169 for idx in indices {
170 let value = values.value(*idx);
171 let value = value.as_ref();
172 buffer.extend_from_slice((value.len() as u32).as_bytes());
173 buffer.extend_from_slice(value);
174 self.variable_length_bytes += value.len() as i64;
175 }
176 }
177 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
178 for idx in indices {
179 let value = values.value(*idx);
180 let value = value.as_ref();
181 lengths.put(&[value.len() as i32]).unwrap();
182 buffer.extend_from_slice(value);
183 self.variable_length_bytes += value.len() as i64;
184 }
185 }
186 FallbackEncoderImpl::Delta {
187 buffer,
188 last_value,
189 prefix_lengths,
190 suffix_lengths,
191 } => {
192 for idx in indices {
193 let value = values.value(*idx);
194 let value = value.as_ref();
195 let mut prefix_length = 0;
196
197 while prefix_length < last_value.len()
198 && prefix_length < value.len()
199 && last_value[prefix_length] == value[prefix_length]
200 {
201 prefix_length += 1;
202 }
203
204 let suffix_length = value.len() - prefix_length;
205
206 last_value.clear();
207 last_value.extend_from_slice(value);
208
209 buffer.extend_from_slice(&value[prefix_length..]);
210 prefix_lengths.put(&[prefix_length as i32]).unwrap();
211 suffix_lengths.put(&[suffix_length as i32]).unwrap();
212 self.variable_length_bytes += value.len() as i64;
213 }
214 }
215 }
216 }
217
218 fn estimated_data_page_size(&self) -> usize {
223 match &self.encoder {
224 FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
225 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
226 buffer.len() + lengths.estimated_data_encoded_size()
227 }
228 FallbackEncoderImpl::Delta {
229 buffer,
230 prefix_lengths,
231 suffix_lengths,
232 ..
233 } => {
234 buffer.len()
235 + prefix_lengths.estimated_data_encoded_size()
236 + suffix_lengths.estimated_data_encoded_size()
237 }
238 }
239 }
240
241 fn flush_data_page(
242 &mut self,
243 min_value: Option<ByteArray>,
244 max_value: Option<ByteArray>,
245 ) -> Result<DataPageValues<ByteArray>> {
246 let (buf, encoding) = match &mut self.encoder {
247 FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
248 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
249 let lengths = lengths.flush_buffer()?;
250
251 let mut out = Vec::with_capacity(lengths.len() + buffer.len());
252 out.extend_from_slice(&lengths);
253 out.extend_from_slice(buffer);
254 buffer.clear();
255 (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
256 }
257 FallbackEncoderImpl::Delta {
258 buffer,
259 prefix_lengths,
260 suffix_lengths,
261 last_value,
262 } => {
263 let prefix_lengths = prefix_lengths.flush_buffer()?;
264 let suffix_lengths = suffix_lengths.flush_buffer()?;
265
266 let mut out =
267 Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
268 out.extend_from_slice(&prefix_lengths);
269 out.extend_from_slice(&suffix_lengths);
270 out.extend_from_slice(buffer);
271 buffer.clear();
272 last_value.clear();
273 (out, Encoding::DELTA_BYTE_ARRAY)
274 }
275 };
276
277 let variable_length_bytes = Some(self.variable_length_bytes);
279 self.variable_length_bytes = 0;
280
281 Ok(DataPageValues {
282 buf: buf.into(),
283 num_values: std::mem::take(&mut self.num_values),
284 encoding,
285 min_value,
286 max_value,
287 variable_length_bytes,
288 })
289 }
290}
291
292#[derive(Debug, Default)]
294struct ByteArrayStorage {
295 page: Vec<u8>,
297
298 values: Vec<std::ops::Range<usize>>,
299}
300
301impl Storage for ByteArrayStorage {
302 type Key = u64;
303 type Value = [u8];
304
305 fn get(&self, idx: Self::Key) -> &Self::Value {
306 &self.page[self.values[idx as usize].clone()]
307 }
308
309 fn push(&mut self, value: &Self::Value) -> Self::Key {
310 let key = self.values.len();
311
312 self.page.reserve(4 + value.len());
313 self.page.extend_from_slice((value.len() as u32).as_bytes());
314
315 let start = self.page.len();
316 self.page.extend_from_slice(value);
317 self.values.push(start..self.page.len());
318
319 key as u64
320 }
321
322 #[allow(dead_code)] fn estimated_memory_size(&self) -> usize {
324 self.page.capacity() * std::mem::size_of::<u8>()
325 + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
326 }
327}
328
329#[derive(Debug, Default)]
331struct DictEncoder {
332 interner: Interner<ByteArrayStorage>,
333 indices: Vec<u64>,
334 variable_length_bytes: i64,
335}
336
337impl DictEncoder {
338 fn encode<T>(&mut self, values: T, indices: &[usize])
340 where
341 T: ArrayAccessor + Copy,
342 T::Item: AsRef<[u8]>,
343 {
344 self.indices.reserve(indices.len());
345
346 for idx in indices {
347 let value = values.value(*idx);
348 let interned = self.interner.intern(value.as_ref());
349 self.indices.push(interned);
350 self.variable_length_bytes += value.as_ref().len() as i64;
351 }
352 }
353
354 fn bit_width(&self) -> u8 {
355 let length = self.interner.storage().values.len();
356 num_required_bits(length.saturating_sub(1) as u64)
357 }
358
359 fn estimated_memory_size(&self) -> usize {
360 self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
361 }
362
363 fn estimated_data_page_size(&self) -> usize {
364 let bit_width = self.bit_width();
365 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
366 }
367
368 fn estimated_dict_page_size(&self) -> usize {
369 self.interner.storage().page.len()
370 }
371
372 fn flush_dict_page(self) -> DictionaryPage {
373 let storage = self.interner.into_inner();
374
375 DictionaryPage {
376 buf: storage.page.into(),
377 num_values: storage.values.len(),
378 is_sorted: false,
379 }
380 }
381
382 fn flush_data_page(
383 &mut self,
384 min_value: Option<ByteArray>,
385 max_value: Option<ByteArray>,
386 ) -> DataPageValues<ByteArray> {
387 let num_values = self.indices.len();
388 let buffer_len = self.estimated_data_page_size();
389 let mut buffer = Vec::with_capacity(buffer_len);
390 buffer.push(self.bit_width());
391
392 let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
393 for index in &self.indices {
394 encoder.put(*index)
395 }
396
397 self.indices.clear();
398
399 let variable_length_bytes = Some(self.variable_length_bytes);
401 self.variable_length_bytes = 0;
402
403 DataPageValues {
404 buf: encoder.consume().into(),
405 num_values,
406 encoding: Encoding::RLE_DICTIONARY,
407 min_value,
408 max_value,
409 variable_length_bytes,
410 }
411 }
412}
413
414pub struct ByteArrayEncoder {
415 fallback: FallbackEncoder,
416 dict_encoder: Option<DictEncoder>,
417 statistics_enabled: EnabledStatistics,
418 min_value: Option<ByteArray>,
419 max_value: Option<ByteArray>,
420 bloom_filter: Option<Sbbf>,
421}
422
423impl ColumnValueEncoder for ByteArrayEncoder {
424 type T = ByteArray;
425 type Values = dyn Array;
426 fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
427 self.bloom_filter.take()
428 }
429
430 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
431 where
432 Self: Sized,
433 {
434 let dictionary = props
435 .dictionary_enabled(descr.path())
436 .then(DictEncoder::default);
437
438 let fallback = FallbackEncoder::new(descr, props)?;
439
440 let bloom_filter = props
441 .bloom_filter_properties(descr.path())
442 .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
443 .transpose()?;
444
445 let statistics_enabled = props.statistics_enabled(descr.path());
446
447 Ok(Self {
448 fallback,
449 statistics_enabled,
450 bloom_filter,
451 dict_encoder: dictionary,
452 min_value: None,
453 max_value: None,
454 })
455 }
456
457 fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
458 unreachable!("should call write_gather instead")
459 }
460
461 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
462 downcast_op!(values.data_type(), values, encode, indices, self);
463 Ok(())
464 }
465
466 fn num_values(&self) -> usize {
467 match &self.dict_encoder {
468 Some(encoder) => encoder.indices.len(),
469 None => self.fallback.num_values,
470 }
471 }
472
473 fn has_dictionary(&self) -> bool {
474 self.dict_encoder.is_some()
475 }
476
477 fn estimated_memory_size(&self) -> usize {
478 let encoder_size = match &self.dict_encoder {
479 Some(encoder) => encoder.estimated_memory_size(),
480 None => self.fallback.estimated_data_page_size(),
483 };
484
485 let bloom_filter_size = self
486 .bloom_filter
487 .as_ref()
488 .map(|bf| bf.estimated_memory_size())
489 .unwrap_or_default();
490
491 let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
492 + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
493
494 encoder_size + bloom_filter_size + stats_size
495 }
496
497 fn estimated_dict_page_size(&self) -> Option<usize> {
498 Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
499 }
500
501 fn estimated_data_page_size(&self) -> usize {
506 match &self.dict_encoder {
507 Some(encoder) => encoder.estimated_data_page_size(),
508 None => self.fallback.estimated_data_page_size(),
509 }
510 }
511
512 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
513 match self.dict_encoder.take() {
514 Some(encoder) => {
515 if !encoder.indices.is_empty() {
516 return Err(general_err!(
517 "Must flush data pages before flushing dictionary"
518 ));
519 }
520
521 Ok(Some(encoder.flush_dict_page()))
522 }
523 _ => Ok(None),
524 }
525 }
526
527 fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
528 let min_value = self.min_value.take();
529 let max_value = self.max_value.take();
530
531 match &mut self.dict_encoder {
532 Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
533 _ => self.fallback.flush_data_page(min_value, max_value),
534 }
535 }
536}
537
538fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
542where
543 T: ArrayAccessor + Copy,
544 T::Item: Copy + Ord + AsRef<[u8]>,
545{
546 if encoder.statistics_enabled != EnabledStatistics::None {
547 if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
548 if encoder.min_value.as_ref().map_or(true, |m| m > &min) {
549 encoder.min_value = Some(min);
550 }
551
552 if encoder.max_value.as_ref().map_or(true, |m| m < &max) {
553 encoder.max_value = Some(max);
554 }
555 }
556 }
557
558 if let Some(bloom_filter) = &mut encoder.bloom_filter {
560 let valid = indices.iter().cloned();
561 for idx in valid {
562 bloom_filter.insert(values.value(idx).as_ref());
563 }
564 }
565
566 match &mut encoder.dict_encoder {
567 Some(dict_encoder) => dict_encoder.encode(values, indices),
568 None => encoder.fallback.encode(values, indices),
569 }
570}
571
572fn compute_min_max<T>(
576 array: T,
577 mut valid: impl Iterator<Item = usize>,
578) -> Option<(ByteArray, ByteArray)>
579where
580 T: ArrayAccessor,
581 T::Item: Copy + Ord + AsRef<[u8]>,
582{
583 let first_idx = valid.next()?;
584
585 let first_val = array.value(first_idx);
586 let mut min = first_val;
587 let mut max = first_val;
588 for idx in valid {
589 let val = array.value(idx);
590 min = min.min(val);
591 max = max.max(val);
592 }
593 Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
594}