1use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{
21 ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
22};
23use crate::data_type::{AsBytes, ByteArray, Int32Type};
24use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
25use crate::encodings::rle::RleEncoder;
26use crate::errors::{ParquetError, Result};
27use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
28use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
29use crate::geospatial::statistics::GeospatialStatistics;
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::num_required_bits;
32use crate::util::interner::{Interner, Storage};
33use arrow_array::types::ByteArrayType;
34use arrow_array::{
35 Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
36 GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
37};
38use arrow_buffer::{ArrowNativeType, Buffer};
39use arrow_schema::DataType;
40
41macro_rules! downcast_dict_impl {
42 ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
43 $op($array
44 .as_any()
45 .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
46 .unwrap()
47 .downcast_dict::<$val>()
48 .unwrap()$(, $arg)*)
49 }};
50}
51
52macro_rules! downcast_dict_op {
53 ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
54 match $key_type.as_ref() {
55 DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
56 DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
57 DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
58 DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
59 DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
60 DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
61 DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
62 DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
63 _ => unreachable!(),
64 }
65 };
66}
67
68macro_rules! downcast_op {
69 ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
70 match $data_type {
71 DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
72 DataType::LargeUtf8 => {
73 $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
74 }
75 DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
76 DataType::Binary => {
77 $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
78 }
79 DataType::LargeBinary => {
80 $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
81 }
82 DataType::BinaryView => {
83 $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
84 }
85 DataType::Dictionary(key, value) => match value.as_ref() {
86 DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
87 DataType::LargeUtf8 => {
88 downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
89 }
90 DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
91 DataType::LargeBinary => {
92 downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
93 }
94 DataType::FixedSizeBinary(_) => {
95 downcast_dict_op!(key, FixedSizeBinaryArray, $array, $op$(, $arg)*)
96 }
97 d => unreachable!("cannot downcast {} dictionary value to byte array", d),
98 },
99 d => unreachable!("cannot downcast {} to byte array", d),
100 }
101 };
102}
103
104struct FallbackEncoder {
106 encoder: FallbackEncoderImpl,
107 num_values: usize,
108 variable_length_bytes: i64,
109}
110
111enum FallbackEncoderImpl {
115 Plain {
116 buffer: Vec<u8>,
117 },
118 DeltaLength {
119 buffer: Vec<u8>,
120 lengths: Box<DeltaBitPackEncoder<Int32Type>>,
121 },
122 Delta {
123 buffer: Vec<u8>,
124 last_value: Vec<u8>,
125 prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
126 suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
127 },
128}
129
130impl FallbackEncoder {
131 fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
133 let encoding =
135 props
136 .encoding(descr.path())
137 .unwrap_or_else(|| match props.writer_version() {
138 WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
139 WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
140 });
141
142 let encoder = match encoding {
143 Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
144 Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
145 buffer: vec![],
146 lengths: Box::new(DeltaBitPackEncoder::new()),
147 },
148 Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
149 buffer: vec![],
150 last_value: vec![],
151 prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
152 suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
153 },
154 _ => {
155 return Err(general_err!(
156 "unsupported encoding {} for byte array",
157 encoding
158 ));
159 }
160 };
161
162 Ok(Self {
163 encoder,
164 num_values: 0,
165 variable_length_bytes: 0,
166 })
167 }
168
169 fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
171 where
172 T: ArrayAccessor + Copy,
173 T::Item: AsRef<[u8]>,
174 {
175 self.num_values += indices.len();
176 match &mut self.encoder {
177 FallbackEncoderImpl::Plain { buffer } => {
178 for idx in indices {
179 let value = values.value(idx);
180 let value = value.as_ref();
181 buffer.extend_from_slice((value.len() as u32).as_bytes());
182 buffer.extend_from_slice(value);
183 self.variable_length_bytes += value.len() as i64;
184 }
185 }
186 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
187 for idx in indices {
188 let value = values.value(idx);
189 let value = value.as_ref();
190 lengths.put(&[value.len() as i32]).unwrap();
191 buffer.extend_from_slice(value);
192 self.variable_length_bytes += value.len() as i64;
193 }
194 }
195 FallbackEncoderImpl::Delta {
196 buffer,
197 last_value,
198 prefix_lengths,
199 suffix_lengths,
200 } => {
201 for idx in indices {
202 let value = values.value(idx);
203 let value = value.as_ref();
204 let mut prefix_length = 0;
205
206 while prefix_length < last_value.len()
207 && prefix_length < value.len()
208 && last_value[prefix_length] == value[prefix_length]
209 {
210 prefix_length += 1;
211 }
212
213 let suffix_length = value.len() - prefix_length;
214
215 last_value.clear();
216 last_value.extend_from_slice(value);
217
218 buffer.extend_from_slice(&value[prefix_length..]);
219 prefix_lengths.put(&[prefix_length as i32]).unwrap();
220 suffix_lengths.put(&[suffix_length as i32]).unwrap();
221 self.variable_length_bytes += value.len() as i64;
222 }
223 }
224 }
225 }
226
227 fn estimated_data_page_size(&self) -> usize {
232 match &self.encoder {
233 FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
234 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
235 buffer.len() + lengths.estimated_data_encoded_size()
236 }
237 FallbackEncoderImpl::Delta {
238 buffer,
239 prefix_lengths,
240 suffix_lengths,
241 ..
242 } => {
243 buffer.len()
244 + prefix_lengths.estimated_data_encoded_size()
245 + suffix_lengths.estimated_data_encoded_size()
246 }
247 }
248 }
249
250 fn flush_data_page(
251 &mut self,
252 min_value: Option<ByteArray>,
253 max_value: Option<ByteArray>,
254 ) -> Result<DataPageValues<ByteArray>> {
255 let (buf, encoding) = match &mut self.encoder {
256 FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
257 FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
258 let lengths = lengths.flush_buffer()?;
259
260 let mut out = Vec::with_capacity(lengths.len() + buffer.len());
261 out.extend_from_slice(&lengths);
262 out.extend_from_slice(buffer);
263 buffer.clear();
264 (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
265 }
266 FallbackEncoderImpl::Delta {
267 buffer,
268 prefix_lengths,
269 suffix_lengths,
270 last_value,
271 } => {
272 let prefix_lengths = prefix_lengths.flush_buffer()?;
273 let suffix_lengths = suffix_lengths.flush_buffer()?;
274
275 let mut out =
276 Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
277 out.extend_from_slice(&prefix_lengths);
278 out.extend_from_slice(&suffix_lengths);
279 out.extend_from_slice(buffer);
280 buffer.clear();
281 last_value.clear();
282 (out, Encoding::DELTA_BYTE_ARRAY)
283 }
284 };
285
286 let variable_length_bytes = Some(self.variable_length_bytes);
288 self.variable_length_bytes = 0;
289
290 Ok(DataPageValues {
291 buf: buf.into(),
292 num_values: std::mem::take(&mut self.num_values),
293 encoding,
294 min_value,
295 max_value,
296 variable_length_bytes,
297 })
298 }
299}
300
301#[derive(Debug, Default)]
303struct ByteArrayStorage {
304 page: Vec<u8>,
306
307 values: Vec<std::ops::Range<usize>>,
308}
309
310impl Storage for ByteArrayStorage {
311 type Key = u64;
312 type Value = [u8];
313
314 fn get(&self, idx: Self::Key) -> &Self::Value {
315 &self.page[self.values[idx as usize].clone()]
316 }
317
318 fn push(&mut self, value: &Self::Value) -> Self::Key {
319 let key = self.values.len();
320
321 self.page.reserve(4 + value.len());
322 self.page.extend_from_slice((value.len() as u32).as_bytes());
323
324 let start = self.page.len();
325 self.page.extend_from_slice(value);
326 self.values.push(start..self.page.len());
327
328 key as u64
329 }
330
331 #[allow(dead_code)] fn estimated_memory_size(&self) -> usize {
333 self.page.capacity() * std::mem::size_of::<u8>()
334 + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
335 }
336}
337
338#[derive(Debug, Default)]
340struct DictEncoder {
341 interner: Interner<ByteArrayStorage>,
342 indices: Vec<u64>,
343 variable_length_bytes: i64,
344}
345
346impl DictEncoder {
347 fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
349 where
350 T: ArrayAccessor + Copy,
351 T::Item: AsRef<[u8]>,
352 {
353 self.indices.reserve(indices.len());
354
355 for idx in indices {
356 let value = values.value(idx);
357 let interned = self.interner.intern(value.as_ref());
358 self.indices.push(interned);
359 self.variable_length_bytes += value.as_ref().len() as i64;
360 }
361 }
362
363 fn bit_width(&self) -> u8 {
364 let length = self.interner.storage().values.len();
365 num_required_bits(length.saturating_sub(1) as u64)
366 }
367
368 fn estimated_memory_size(&self) -> usize {
369 self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
370 }
371
372 fn estimated_data_page_size(&self) -> usize {
373 let bit_width = self.bit_width();
374 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
375 }
376
377 fn estimated_dict_page_size(&self) -> usize {
378 self.interner.storage().page.len()
379 }
380
381 fn flush_dict_page(self) -> DictionaryPage {
382 let storage = self.interner.into_inner();
383
384 DictionaryPage {
385 buf: storage.page.into(),
386 num_values: storage.values.len(),
387 is_sorted: false,
388 }
389 }
390
391 fn flush_data_page(
392 &mut self,
393 min_value: Option<ByteArray>,
394 max_value: Option<ByteArray>,
395 ) -> DataPageValues<ByteArray> {
396 let num_values = self.indices.len();
397 let buffer_len = self.estimated_data_page_size();
398 let mut buffer = Vec::with_capacity(buffer_len);
399 buffer.push(self.bit_width());
400
401 let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
402 for index in &self.indices {
403 encoder.put(*index)
404 }
405
406 self.indices.clear();
407
408 let variable_length_bytes = Some(self.variable_length_bytes);
410 self.variable_length_bytes = 0;
411
412 DataPageValues {
413 buf: encoder.consume().into(),
414 num_values,
415 encoding: Encoding::RLE_DICTIONARY,
416 min_value,
417 max_value,
418 variable_length_bytes,
419 }
420 }
421}
422
423pub struct ByteArrayEncoder {
424 fallback: FallbackEncoder,
425 dict_encoder: Option<DictEncoder>,
426 statistics_enabled: EnabledStatistics,
427 min_value: Option<ByteArray>,
428 max_value: Option<ByteArray>,
429 bloom_filter: Option<Sbbf>,
430 bloom_filter_target_fpp: f64,
431 geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
432}
433
434impl ColumnValueEncoder for ByteArrayEncoder {
435 type T = ByteArray;
436 type Values = dyn Array;
437 fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
438 let mut sbbf = self.bloom_filter.take()?;
439 sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
440 Some(sbbf)
441 }
442
443 fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
444 where
445 Self: Sized,
446 {
447 let dictionary = props
448 .dictionary_enabled(descr.path())
449 .then(DictEncoder::default);
450
451 let fallback = FallbackEncoder::new(descr, props)?;
452
453 let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
454
455 let statistics_enabled = props.statistics_enabled(descr.path());
456
457 let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
458
459 Ok(Self {
460 fallback,
461 statistics_enabled,
462 bloom_filter,
463 bloom_filter_target_fpp,
464 dict_encoder: dictionary,
465 min_value: None,
466 max_value: None,
467 geo_stats_accumulator,
468 })
469 }
470
471 fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
472 unreachable!("should call write_gather instead")
473 }
474
475 fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
476 downcast_op!(
477 values.data_type(),
478 values,
479 encode,
480 indices.iter().copied(),
481 self
482 );
483 Ok(())
484 }
485
486 fn count_values_within_byte_budget_gather(
487 values: &Self::Values,
488 indices: &[usize],
489 byte_budget: usize,
490 ) -> Option<usize> {
491 let count = match values.data_type() {
505 DataType::Utf8 => count_within_budget_offsets(
506 values.as_any().downcast_ref::<StringArray>().unwrap(),
507 indices,
508 byte_budget,
509 ),
510 DataType::LargeUtf8 => count_within_budget_offsets(
511 values.as_any().downcast_ref::<LargeStringArray>().unwrap(),
512 indices,
513 byte_budget,
514 ),
515 DataType::Binary => count_within_budget_offsets(
516 values.as_any().downcast_ref::<BinaryArray>().unwrap(),
517 indices,
518 byte_budget,
519 ),
520 DataType::LargeBinary => count_within_budget_offsets(
521 values.as_any().downcast_ref::<LargeBinaryArray>().unwrap(),
522 indices,
523 byte_budget,
524 ),
525 DataType::Utf8View => {
530 let array = values.as_any().downcast_ref::<StringViewArray>().unwrap();
531 count_within_budget_views(
532 array.views(),
533 indices,
534 byte_budget,
535 max_view_value_len(array.data_buffers()),
536 )
537 }
538 DataType::BinaryView => {
539 let array = values.as_any().downcast_ref::<BinaryViewArray>().unwrap();
540 count_within_budget_views(
541 array.views(),
542 indices,
543 byte_budget,
544 max_view_value_len(array.data_buffers()),
545 )
546 }
547 DataType::Dictionary(_, _) => indices.len(),
553 data_type => unreachable!("ByteArrayEncoder cannot be constructed for {data_type:?}"),
560 };
561 Some(count)
562 }
563
564 fn num_values(&self) -> usize {
565 match &self.dict_encoder {
566 Some(encoder) => encoder.indices.len(),
567 None => self.fallback.num_values,
568 }
569 }
570
571 fn has_dictionary(&self) -> bool {
572 self.dict_encoder.is_some()
573 }
574
575 fn estimated_memory_size(&self) -> usize {
576 let encoder_size = match &self.dict_encoder {
577 Some(encoder) => encoder.estimated_memory_size(),
578 None => self.fallback.estimated_data_page_size(),
581 };
582
583 let bloom_filter_size = self
584 .bloom_filter
585 .as_ref()
586 .map(|bf| bf.estimated_memory_size())
587 .unwrap_or_default();
588
589 let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
590 + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
591
592 encoder_size + bloom_filter_size + stats_size
593 }
594
595 fn estimated_dict_page_size(&self) -> Option<usize> {
596 Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
597 }
598
599 fn estimated_data_page_size(&self) -> usize {
604 match &self.dict_encoder {
605 Some(encoder) => encoder.estimated_data_page_size(),
606 None => self.fallback.estimated_data_page_size(),
607 }
608 }
609
610 fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
611 match self.dict_encoder.take() {
612 Some(encoder) => {
613 if !encoder.indices.is_empty() {
614 return Err(general_err!(
615 "Must flush data pages before flushing dictionary"
616 ));
617 }
618
619 Ok(Some(encoder.flush_dict_page()))
620 }
621 _ => Ok(None),
622 }
623 }
624
625 fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
626 let min_value = self.min_value.take();
627 let max_value = self.max_value.take();
628
629 match &mut self.dict_encoder {
630 Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
631 _ => self.fallback.flush_data_page(min_value, max_value),
632 }
633 }
634
635 fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
636 self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
637 }
638}
639
640fn encode<T, I>(values: T, indices: I, encoder: &mut ByteArrayEncoder)
644where
645 T: ArrayAccessor + Copy,
646 T::Item: Copy + Ord + AsRef<[u8]>,
647 I: ExactSizeIterator<Item = usize> + Clone,
648{
649 if encoder.statistics_enabled != EnabledStatistics::None {
650 if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
651 update_geo_stats_accumulator(accumulator.as_mut(), values, indices.clone());
652 } else if let Some((min, max)) = compute_min_max(values, indices.clone()) {
653 if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
654 encoder.min_value = Some(min);
655 }
656
657 if encoder.max_value.as_ref().is_none_or(|m| m < &max) {
658 encoder.max_value = Some(max);
659 }
660 }
661 }
662
663 if let Some(bloom_filter) = &mut encoder.bloom_filter {
665 for idx in indices.clone() {
666 bloom_filter.insert(values.value(idx).as_ref());
667 }
668 }
669
670 match &mut encoder.dict_encoder {
671 Some(dict_encoder) => dict_encoder.encode(values, indices),
672 None => encoder.fallback.encode(values, indices),
673 }
674}
675
676fn max_view_value_len(buffers: &[Buffer]) -> usize {
678 const MAX_INLINE_VIEW_LEN: usize = 12;
680 buffers
685 .iter()
686 .map(|b| b.len())
687 .max()
688 .unwrap_or(0)
689 .max(MAX_INLINE_VIEW_LEN)
690}
691
692fn count_within_budget_views(
696 views: &[u128],
697 indices: &[usize],
698 byte_budget: usize,
699 max_value_len: usize,
700) -> usize {
701 let per_value = max_value_len + std::mem::size_of::<u32>();
712 if indices.len().saturating_mul(per_value) <= byte_budget {
713 return indices.len();
714 }
715 let mut cum: usize = 0;
718 for (i, idx) in indices.iter().enumerate() {
719 let len = (views[*idx] as u32) as usize;
720 cum = cum.saturating_add(len + std::mem::size_of::<u32>());
721 if cum > byte_budget {
722 return i + 1;
723 }
724 }
725 indices.len()
726}
727
728fn count_within_budget_offsets<T: ByteArrayType>(
735 values: &GenericByteArray<T>,
736 indices: &[usize],
737 byte_budget: usize,
738) -> usize {
739 if indices.is_empty() {
740 return 0;
741 }
742 let n = indices.len();
743 let first = indices[0];
744 let last = indices[n - 1];
745 let offsets = values.value_offsets();
746 let prefix_overhead = std::mem::size_of::<u32>();
748
749 if last >= first {
757 let payload = (offsets[last + 1] - offsets[first]).as_usize();
758 if payload + n * prefix_overhead <= byte_budget {
759 return n;
760 }
761 }
762
763 let mut cum: usize = 0;
765 for (i, idx) in indices.iter().enumerate() {
766 let len = (offsets[idx + 1] - offsets[*idx]).as_usize() + prefix_overhead;
767 cum = cum.saturating_add(len);
768 if cum > byte_budget {
769 return i + 1;
770 }
771 }
772 n
773}
774
775fn compute_min_max<T>(
779 array: T,
780 mut valid: impl Iterator<Item = usize>,
781) -> Option<(ByteArray, ByteArray)>
782where
783 T: ArrayAccessor,
784 T::Item: Copy + Ord + AsRef<[u8]>,
785{
786 let first_idx = valid.next()?;
787
788 let first_val = array.value(first_idx);
789 let mut min = first_val;
790 let mut max = first_val;
791 for idx in valid {
792 let val = array.value(idx);
793 min = min.min(val);
794 max = max.max(val);
795 }
796 Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
797}
798
799fn update_geo_stats_accumulator<T>(
801 bounder: &mut dyn GeoStatsAccumulator,
802 array: T,
803 valid: impl Iterator<Item = usize>,
804) where
805 T: ArrayAccessor,
806 T::Item: Copy + Ord + AsRef<[u8]>,
807{
808 if bounder.is_valid() {
809 for idx in valid {
810 let val = array.value(idx);
811 bounder.update_wkb(val.as_ref());
812 }
813 }
814}