Skip to main content

parquet/arrow/arrow_writer/
byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::Encoding;
19use crate::bloom_filter::Sbbf;
20use crate::column::writer::encoder::{
21    ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
22};
23use crate::data_type::{AsBytes, ByteArray, Int32Type};
24use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
25use crate::encodings::rle::RleEncoder;
26use crate::errors::{ParquetError, Result};
27use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
28use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
29use crate::geospatial::statistics::GeospatialStatistics;
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::num_required_bits;
32use crate::util::interner::{Interner, Storage};
33use arrow_array::types::ByteArrayType;
34use arrow_array::{
35    Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
36    GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
37};
38use arrow_buffer::{ArrowNativeType, Buffer};
39use arrow_schema::DataType;
40
41macro_rules! downcast_dict_impl {
42    ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
43        $op($array
44            .as_any()
45            .downcast_ref::<DictionaryArray<arrow_array::types::$key>>()
46            .unwrap()
47            .downcast_dict::<$val>()
48            .unwrap()$(, $arg)*)
49    }};
50}
51
52macro_rules! downcast_dict_op {
53    ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
54        match $key_type.as_ref() {
55            DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
56            DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
57            DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
58            DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
59            DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
60            DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
61            DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
62            DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
63            _ => unreachable!(),
64        }
65    };
66}
67
68macro_rules! downcast_op {
69    ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
70        match $data_type {
71            DataType::Utf8 => $op($array.as_any().downcast_ref::<StringArray>().unwrap()$(, $arg)*),
72            DataType::LargeUtf8 => {
73                $op($array.as_any().downcast_ref::<LargeStringArray>().unwrap()$(, $arg)*)
74            }
75            DataType::Utf8View => $op($array.as_any().downcast_ref::<StringViewArray>().unwrap()$(, $arg)*),
76            DataType::Binary => {
77                $op($array.as_any().downcast_ref::<BinaryArray>().unwrap()$(, $arg)*)
78            }
79            DataType::LargeBinary => {
80                $op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
81            }
82            DataType::BinaryView => {
83                $op($array.as_any().downcast_ref::<BinaryViewArray>().unwrap()$(, $arg)*)
84            }
85            DataType::Dictionary(key, value) => match value.as_ref() {
86                DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
87                DataType::LargeUtf8 => {
88                    downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
89                }
90                DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
91                DataType::LargeBinary => {
92                    downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
93                }
94                DataType::FixedSizeBinary(_) => {
95                    downcast_dict_op!(key, FixedSizeBinaryArray, $array, $op$(, $arg)*)
96                }
97                d => unreachable!("cannot downcast {} dictionary value to byte array", d),
98            },
99            d => unreachable!("cannot downcast {} to byte array", d),
100        }
101    };
102}
103
104/// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
105struct FallbackEncoder {
106    encoder: FallbackEncoderImpl,
107    num_values: usize,
108    variable_length_bytes: i64,
109}
110
111/// The fallback encoder in use
112///
113/// Note: DeltaBitPackEncoder is boxed as it is rather large
114enum FallbackEncoderImpl {
115    Plain {
116        buffer: Vec<u8>,
117    },
118    DeltaLength {
119        buffer: Vec<u8>,
120        lengths: Box<DeltaBitPackEncoder<Int32Type>>,
121    },
122    Delta {
123        buffer: Vec<u8>,
124        last_value: Vec<u8>,
125        prefix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
126        suffix_lengths: Box<DeltaBitPackEncoder<Int32Type>>,
127    },
128}
129
130impl FallbackEncoder {
131    /// Create the fallback encoder for the given [`ColumnDescPtr`] and [`WriterProperties`]
132    fn new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
133        // Set either main encoder or fallback encoder.
134        let encoding =
135            props
136                .encoding(descr.path())
137                .unwrap_or_else(|| match props.writer_version() {
138                    WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
139                    WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
140                });
141
142        let encoder = match encoding {
143            Encoding::PLAIN => FallbackEncoderImpl::Plain { buffer: vec![] },
144            Encoding::DELTA_LENGTH_BYTE_ARRAY => FallbackEncoderImpl::DeltaLength {
145                buffer: vec![],
146                lengths: Box::new(DeltaBitPackEncoder::new()),
147            },
148            Encoding::DELTA_BYTE_ARRAY => FallbackEncoderImpl::Delta {
149                buffer: vec![],
150                last_value: vec![],
151                prefix_lengths: Box::new(DeltaBitPackEncoder::new()),
152                suffix_lengths: Box::new(DeltaBitPackEncoder::new()),
153            },
154            _ => {
155                return Err(general_err!(
156                    "unsupported encoding {} for byte array",
157                    encoding
158                ));
159            }
160        };
161
162        Ok(Self {
163            encoder,
164            num_values: 0,
165            variable_length_bytes: 0,
166        })
167    }
168
169    /// Encode `values` to the in-progress page
170    fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
171    where
172        T: ArrayAccessor + Copy,
173        T::Item: AsRef<[u8]>,
174    {
175        self.num_values += indices.len();
176        match &mut self.encoder {
177            FallbackEncoderImpl::Plain { buffer } => {
178                for idx in indices {
179                    let value = values.value(idx);
180                    let value = value.as_ref();
181                    buffer.extend_from_slice((value.len() as u32).as_bytes());
182                    buffer.extend_from_slice(value);
183                    self.variable_length_bytes += value.len() as i64;
184                }
185            }
186            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
187                for idx in indices {
188                    let value = values.value(idx);
189                    let value = value.as_ref();
190                    lengths.put(&[value.len() as i32]).unwrap();
191                    buffer.extend_from_slice(value);
192                    self.variable_length_bytes += value.len() as i64;
193                }
194            }
195            FallbackEncoderImpl::Delta {
196                buffer,
197                last_value,
198                prefix_lengths,
199                suffix_lengths,
200            } => {
201                for idx in indices {
202                    let value = values.value(idx);
203                    let value = value.as_ref();
204                    let mut prefix_length = 0;
205
206                    while prefix_length < last_value.len()
207                        && prefix_length < value.len()
208                        && last_value[prefix_length] == value[prefix_length]
209                    {
210                        prefix_length += 1;
211                    }
212
213                    let suffix_length = value.len() - prefix_length;
214
215                    last_value.clear();
216                    last_value.extend_from_slice(value);
217
218                    buffer.extend_from_slice(&value[prefix_length..]);
219                    prefix_lengths.put(&[prefix_length as i32]).unwrap();
220                    suffix_lengths.put(&[suffix_length as i32]).unwrap();
221                    self.variable_length_bytes += value.len() as i64;
222                }
223            }
224        }
225    }
226
227    /// Returns an estimate of the data page size in bytes
228    ///
229    /// This includes:
230    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
231    fn estimated_data_page_size(&self) -> usize {
232        match &self.encoder {
233            FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
234            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
235                buffer.len() + lengths.estimated_data_encoded_size()
236            }
237            FallbackEncoderImpl::Delta {
238                buffer,
239                prefix_lengths,
240                suffix_lengths,
241                ..
242            } => {
243                buffer.len()
244                    + prefix_lengths.estimated_data_encoded_size()
245                    + suffix_lengths.estimated_data_encoded_size()
246            }
247        }
248    }
249
250    fn flush_data_page(
251        &mut self,
252        min_value: Option<ByteArray>,
253        max_value: Option<ByteArray>,
254    ) -> Result<DataPageValues<ByteArray>> {
255        let (buf, encoding) = match &mut self.encoder {
256            FallbackEncoderImpl::Plain { buffer } => (std::mem::take(buffer), Encoding::PLAIN),
257            FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
258                let lengths = lengths.flush_buffer()?;
259
260                let mut out = Vec::with_capacity(lengths.len() + buffer.len());
261                out.extend_from_slice(&lengths);
262                out.extend_from_slice(buffer);
263                buffer.clear();
264                (out, Encoding::DELTA_LENGTH_BYTE_ARRAY)
265            }
266            FallbackEncoderImpl::Delta {
267                buffer,
268                prefix_lengths,
269                suffix_lengths,
270                last_value,
271            } => {
272                let prefix_lengths = prefix_lengths.flush_buffer()?;
273                let suffix_lengths = suffix_lengths.flush_buffer()?;
274
275                let mut out =
276                    Vec::with_capacity(prefix_lengths.len() + suffix_lengths.len() + buffer.len());
277                out.extend_from_slice(&prefix_lengths);
278                out.extend_from_slice(&suffix_lengths);
279                out.extend_from_slice(buffer);
280                buffer.clear();
281                last_value.clear();
282                (out, Encoding::DELTA_BYTE_ARRAY)
283            }
284        };
285
286        // Capture value of variable_length_bytes and reset for next page
287        let variable_length_bytes = Some(self.variable_length_bytes);
288        self.variable_length_bytes = 0;
289
290        Ok(DataPageValues {
291            buf: buf.into(),
292            num_values: std::mem::take(&mut self.num_values),
293            encoding,
294            min_value,
295            max_value,
296            variable_length_bytes,
297        })
298    }
299}
300
301/// [`Storage`] for the [`Interner`] used by [`DictEncoder`]
302#[derive(Debug, Default)]
303struct ByteArrayStorage {
304    /// Encoded dictionary data
305    page: Vec<u8>,
306
307    values: Vec<std::ops::Range<usize>>,
308}
309
310impl Storage for ByteArrayStorage {
311    type Key = u64;
312    type Value = [u8];
313
314    fn get(&self, idx: Self::Key) -> &Self::Value {
315        &self.page[self.values[idx as usize].clone()]
316    }
317
318    fn push(&mut self, value: &Self::Value) -> Self::Key {
319        let key = self.values.len();
320
321        self.page.reserve(4 + value.len());
322        self.page.extend_from_slice((value.len() as u32).as_bytes());
323
324        let start = self.page.len();
325        self.page.extend_from_slice(value);
326        self.values.push(start..self.page.len());
327
328        key as u64
329    }
330
331    #[allow(dead_code)] // not used in parquet_derive, so is dead there
332    fn estimated_memory_size(&self) -> usize {
333        self.page.capacity() * std::mem::size_of::<u8>()
334            + self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
335    }
336}
337
338/// A dictionary encoder for byte array data
339#[derive(Debug, Default)]
340struct DictEncoder {
341    interner: Interner<ByteArrayStorage>,
342    indices: Vec<u64>,
343    variable_length_bytes: i64,
344}
345
346impl DictEncoder {
347    /// Encode `values` to the in-progress page
348    fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
349    where
350        T: ArrayAccessor + Copy,
351        T::Item: AsRef<[u8]>,
352    {
353        self.indices.reserve(indices.len());
354
355        for idx in indices {
356            let value = values.value(idx);
357            let interned = self.interner.intern(value.as_ref());
358            self.indices.push(interned);
359            self.variable_length_bytes += value.as_ref().len() as i64;
360        }
361    }
362
363    fn bit_width(&self) -> u8 {
364        let length = self.interner.storage().values.len();
365        num_required_bits(length.saturating_sub(1) as u64)
366    }
367
368    fn estimated_memory_size(&self) -> usize {
369        self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
370    }
371
372    fn estimated_data_page_size(&self) -> usize {
373        let bit_width = self.bit_width();
374        1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
375    }
376
377    fn estimated_dict_page_size(&self) -> usize {
378        self.interner.storage().page.len()
379    }
380
381    fn flush_dict_page(self) -> DictionaryPage {
382        let storage = self.interner.into_inner();
383
384        DictionaryPage {
385            buf: storage.page.into(),
386            num_values: storage.values.len(),
387            is_sorted: false,
388        }
389    }
390
391    fn flush_data_page(
392        &mut self,
393        min_value: Option<ByteArray>,
394        max_value: Option<ByteArray>,
395    ) -> DataPageValues<ByteArray> {
396        let num_values = self.indices.len();
397        let buffer_len = self.estimated_data_page_size();
398        let mut buffer = Vec::with_capacity(buffer_len);
399        buffer.push(self.bit_width());
400
401        let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
402        for index in &self.indices {
403            encoder.put(*index)
404        }
405
406        self.indices.clear();
407
408        // Capture value of variable_length_bytes and reset for next page
409        let variable_length_bytes = Some(self.variable_length_bytes);
410        self.variable_length_bytes = 0;
411
412        DataPageValues {
413            buf: encoder.consume().into(),
414            num_values,
415            encoding: Encoding::RLE_DICTIONARY,
416            min_value,
417            max_value,
418            variable_length_bytes,
419        }
420    }
421}
422
423pub struct ByteArrayEncoder {
424    fallback: FallbackEncoder,
425    dict_encoder: Option<DictEncoder>,
426    statistics_enabled: EnabledStatistics,
427    min_value: Option<ByteArray>,
428    max_value: Option<ByteArray>,
429    bloom_filter: Option<Sbbf>,
430    bloom_filter_target_fpp: f64,
431    geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
432}
433
434impl ColumnValueEncoder for ByteArrayEncoder {
435    type T = ByteArray;
436    type Values = dyn Array;
437    fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
438        let mut sbbf = self.bloom_filter.take()?;
439        sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
440        Some(sbbf)
441    }
442
443    fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
444    where
445        Self: Sized,
446    {
447        let dictionary = props
448            .dictionary_enabled(descr.path())
449            .then(DictEncoder::default);
450
451        let fallback = FallbackEncoder::new(descr, props)?;
452
453        let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
454
455        let statistics_enabled = props.statistics_enabled(descr.path());
456
457        let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
458
459        Ok(Self {
460            fallback,
461            statistics_enabled,
462            bloom_filter,
463            bloom_filter_target_fpp,
464            dict_encoder: dictionary,
465            min_value: None,
466            max_value: None,
467            geo_stats_accumulator,
468        })
469    }
470
471    fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
472        unreachable!("should call write_gather instead")
473    }
474
475    fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
476        downcast_op!(
477            values.data_type(),
478            values,
479            encode,
480            indices.iter().copied(),
481            self
482        );
483        Ok(())
484    }
485
486    fn count_values_within_byte_budget_gather(
487        values: &Self::Values,
488        indices: &[usize],
489        byte_budget: usize,
490    ) -> Option<usize> {
491        // `ByteArrayEncoder` only ever writes via `write_gather`, so this
492        // is the relevant method.
493        //
494        // Two-stage walk for the simple offset-buffer byte array types:
495        //   1. If indices are contiguous, compute the total payload in
496        //      O(1) via a single subtraction on the offsets buffer.
497        //      When the total fits the budget — the overwhelmingly
498        //      common "small values" case — return immediately.
499        //   2. Otherwise, walk per-value byte sizes from the offsets
500        //      buffer (still cheap, no slice/UTF-8 construction) and
501        //      exit at the first value that pushes the cumulative sum
502        //      past the budget. This bounds skewed distributions: an
503        //      outlier value is caught wherever it lands in the chunk.
504        let count = match values.data_type() {
505            DataType::Utf8 => count_within_budget_offsets(
506                values.as_any().downcast_ref::<StringArray>().unwrap(),
507                indices,
508                byte_budget,
509            ),
510            DataType::LargeUtf8 => count_within_budget_offsets(
511                values.as_any().downcast_ref::<LargeStringArray>().unwrap(),
512                indices,
513                byte_budget,
514            ),
515            DataType::Binary => count_within_budget_offsets(
516                values.as_any().downcast_ref::<BinaryArray>().unwrap(),
517                indices,
518                byte_budget,
519            ),
520            DataType::LargeBinary => count_within_budget_offsets(
521                values.as_any().downcast_ref::<LargeBinaryArray>().unwrap(),
522                indices,
523                byte_budget,
524            ),
525            // View arrays carry each value's length in the low 32 bits of
526            // its u128 view word, so lengths are scannable without touching
527            // any data buffer — and the common small-value case skips even
528            // that scan via an O(1) conservative bound.
529            DataType::Utf8View => {
530                let array = values.as_any().downcast_ref::<StringViewArray>().unwrap();
531                count_within_budget_views(
532                    array.views(),
533                    indices,
534                    byte_budget,
535                    max_view_value_len(array.data_buffers()),
536                )
537            }
538            DataType::BinaryView => {
539                let array = values.as_any().downcast_ref::<BinaryViewArray>().unwrap();
540                count_within_budget_views(
541                    array.views(),
542                    indices,
543                    byte_budget,
544                    max_view_value_len(array.data_buffers()),
545                )
546            }
547            // The values in an arrow dictionary are already small and
548            // deduplicated, so there is nothing to bound — treat every
549            // chunk as fitting and stay on the batched path. (A per-value
550            // walk through dict keys on every chunk also measured ~+30-80%
551            // slower than `main`.)
552            DataType::Dictionary(_, _) => indices.len(),
553            // Every byte-array type `ByteArrayEncoder` is constructed for
554            // has an explicit arm above. A `Dictionary(value = FixedSizeBinary)`
555            // column hits the `Dictionary(_, _)` arm (its `values.data_type()`
556            // is `Dictionary`), and a bare `FixedSizeBinary` column is routed
557            // to the generic column writer, never this encoder — so no other
558            // type can reach here.
559            data_type => unreachable!("ByteArrayEncoder cannot be constructed for {data_type:?}"),
560        };
561        Some(count)
562    }
563
564    fn num_values(&self) -> usize {
565        match &self.dict_encoder {
566            Some(encoder) => encoder.indices.len(),
567            None => self.fallback.num_values,
568        }
569    }
570
571    fn has_dictionary(&self) -> bool {
572        self.dict_encoder.is_some()
573    }
574
575    fn estimated_memory_size(&self) -> usize {
576        let encoder_size = match &self.dict_encoder {
577            Some(encoder) => encoder.estimated_memory_size(),
578            // For the FallbackEncoder, these unflushed bytes are already encoded.
579            // Therefore, the size should be the same as estimated_data_page_size.
580            None => self.fallback.estimated_data_page_size(),
581        };
582
583        let bloom_filter_size = self
584            .bloom_filter
585            .as_ref()
586            .map(|bf| bf.estimated_memory_size())
587            .unwrap_or_default();
588
589        let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
590            + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();
591
592        encoder_size + bloom_filter_size + stats_size
593    }
594
595    fn estimated_dict_page_size(&self) -> Option<usize> {
596        Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
597    }
598
599    /// Returns an estimate of the data page size in bytes
600    ///
601    /// This includes:
602    /// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
603    fn estimated_data_page_size(&self) -> usize {
604        match &self.dict_encoder {
605            Some(encoder) => encoder.estimated_data_page_size(),
606            None => self.fallback.estimated_data_page_size(),
607        }
608    }
609
610    fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
611        match self.dict_encoder.take() {
612            Some(encoder) => {
613                if !encoder.indices.is_empty() {
614                    return Err(general_err!(
615                        "Must flush data pages before flushing dictionary"
616                    ));
617                }
618
619                Ok(Some(encoder.flush_dict_page()))
620            }
621            _ => Ok(None),
622        }
623    }
624
625    fn flush_data_page(&mut self) -> Result<DataPageValues<ByteArray>> {
626        let min_value = self.min_value.take();
627        let max_value = self.max_value.take();
628
629        match &mut self.dict_encoder {
630            Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
631            _ => self.fallback.flush_data_page(min_value, max_value),
632        }
633    }
634
635    fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
636        self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
637    }
638}
639
640/// Encodes the provided `values` and `indices` to `encoder`
641///
642/// This is a free function so it can be used with `downcast_op!`
643fn encode<T, I>(values: T, indices: I, encoder: &mut ByteArrayEncoder)
644where
645    T: ArrayAccessor + Copy,
646    T::Item: Copy + Ord + AsRef<[u8]>,
647    I: ExactSizeIterator<Item = usize> + Clone,
648{
649    if encoder.statistics_enabled != EnabledStatistics::None {
650        if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
651            update_geo_stats_accumulator(accumulator.as_mut(), values, indices.clone());
652        } else if let Some((min, max)) = compute_min_max(values, indices.clone()) {
653            if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
654                encoder.min_value = Some(min);
655            }
656
657            if encoder.max_value.as_ref().is_none_or(|m| m < &max) {
658                encoder.max_value = Some(max);
659            }
660        }
661    }
662
663    // encode the values into bloom filter if enabled
664    if let Some(bloom_filter) = &mut encoder.bloom_filter {
665        for idx in indices.clone() {
666            bloom_filter.insert(values.value(idx).as_ref());
667        }
668    }
669
670    match &mut encoder.dict_encoder {
671        Some(dict_encoder) => dict_encoder.encode(values, indices),
672        None => encoder.fallback.encode(values, indices),
673    }
674}
675
676/// Upper bound on any single value's byte length in a view array.
677fn max_view_value_len(buffers: &[Buffer]) -> usize {
678    /// Bytes that fit inline in a u128 view word (the rest is len + prefix).
679    const MAX_INLINE_VIEW_LEN: usize = 12;
680    // An out-of-line view's data is a contiguous slice of exactly one data
681    // buffer, so it cannot exceed the largest buffer; inline views hold at
682    // most `MAX_INLINE_VIEW_LEN`. Loose (a value is usually far smaller than
683    // a whole buffer) but O(number of buffers) and always sound.
684    buffers
685        .iter()
686        .map(|b| b.len())
687        .max()
688        .unwrap_or(0)
689        .max(MAX_INLINE_VIEW_LEN)
690}
691
692/// Number of leading `indices` whose cumulative plain-encoded size fits
693/// `byte_budget` (boundary value included), for view arrays (`Utf8View`,
694/// `BinaryView`).
695fn count_within_budget_views(
696    views: &[u128],
697    indices: &[usize],
698    byte_budget: usize,
699    max_value_len: usize,
700) -> usize {
701    // Each plain-encoded BYTE_ARRAY value carries a 4-byte length prefix, so
702    // the budget is compared against `value_len + size_of::<u32>()` — the
703    // bytes actually written to the page, not just the payload.
704    //
705    // Stage 1: O(1) conservative bound. View arrays have no prefix-sum
706    // offsets buffer, so the exact span subtraction used by
707    // `count_within_budget_offsets` is unavailable; instead bound every
708    // value by `max_value_len`. Skips the walk for the common small-value
709    // case (what view arrays are built for, and where there is nothing to
710    // bound).
711    let per_value = max_value_len + std::mem::size_of::<u32>();
712    if indices.len().saturating_mul(per_value) <= byte_budget {
713        return indices.len();
714    }
715    // Stage 2: exact per-value scan, reading each length from the low 32
716    // bits of its u128 view word (no data-buffer dereference).
717    let mut cum: usize = 0;
718    for (i, idx) in indices.iter().enumerate() {
719        let len = (views[*idx] as u32) as usize;
720        cum = cum.saturating_add(len + std::mem::size_of::<u32>());
721        if cum > byte_budget {
722            return i + 1;
723        }
724    }
725    indices.len()
726}
727
728/// Number of leading `indices` whose cumulative plain-encoded size fits
729/// `byte_budget` (boundary value included), for offset-buffer byte arrays
730/// (`Utf8`/`LargeUtf8`/`Binary`/`LargeBinary`).
731///
732/// `indices` are assumed sorted ascending — they always are here, since
733/// they come from `non_null_indices`, which is built in array order.
734fn count_within_budget_offsets<T: ByteArrayType>(
735    values: &GenericByteArray<T>,
736    indices: &[usize],
737    byte_budget: usize,
738) -> usize {
739    if indices.is_empty() {
740        return 0;
741    }
742    let n = indices.len();
743    let first = indices[0];
744    let last = indices[n - 1];
745    let offsets = values.value_offsets();
746    // Each plain-encoded value carries a 4-byte length prefix on the page.
747    let prefix_overhead = std::mem::size_of::<u32>();
748
749    // Stage 1: O(1) span upper bound. The span `offsets[last+1] -
750    // offsets[first]` covers every array position in `[first, last]`, a
751    // superset of `indices` — and the skipped positions in a nullable
752    // column are nulls with zero offset delta, so the span still equals the
753    // exact payload. If it fits the budget, every value fits. Covers the
754    // common small-value case for both non-null and (sparse) nullable
755    // columns.
756    if last >= first {
757        let payload = (offsets[last + 1] - offsets[first]).as_usize();
758        if payload + n * prefix_overhead <= byte_budget {
759            return n;
760        }
761    }
762
763    // Stage 2: scan per-index lengths from the offsets buffer.
764    let mut cum: usize = 0;
765    for (i, idx) in indices.iter().enumerate() {
766        let len = (offsets[idx + 1] - offsets[*idx]).as_usize() + prefix_overhead;
767        cum = cum.saturating_add(len);
768        if cum > byte_budget {
769            return i + 1;
770        }
771    }
772    n
773}
774
775/// Computes the min and max for the provided array and indices
776///
777/// This is a free function so it can be used with `downcast_op!`
778fn compute_min_max<T>(
779    array: T,
780    mut valid: impl Iterator<Item = usize>,
781) -> Option<(ByteArray, ByteArray)>
782where
783    T: ArrayAccessor,
784    T::Item: Copy + Ord + AsRef<[u8]>,
785{
786    let first_idx = valid.next()?;
787
788    let first_val = array.value(first_idx);
789    let mut min = first_val;
790    let mut max = first_val;
791    for idx in valid {
792        let val = array.value(idx);
793        min = min.min(val);
794        max = max.max(val);
795    }
796    Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
797}
798
799/// Updates geospatial statistics for the provided array and indices
800fn update_geo_stats_accumulator<T>(
801    bounder: &mut dyn GeoStatsAccumulator,
802    array: T,
803    valid: impl Iterator<Item = usize>,
804) where
805    T: ArrayAccessor,
806    T::Item: Copy + Ord + AsRef<[u8]>,
807{
808    if bounder.is_valid() {
809        for idx in valid {
810            let val = array.value(idx);
811            bounder.update_wkb(val.as_ref());
812        }
813    }
814}