arrow_row/
variable.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::null_sentinel;
19use arrow_array::builder::BufferBuilder;
20use arrow_array::types::ByteArrayType;
21use arrow_array::*;
22use arrow_buffer::bit_util::ceil;
23use arrow_buffer::{ArrowNativeType, MutableBuffer};
24use arrow_data::{ArrayDataBuilder, MAX_INLINE_VIEW_LEN};
25use arrow_schema::{DataType, SortOptions};
26use builder::make_view;
27
28/// The block size of the variable length encoding
29pub const BLOCK_SIZE: usize = 32;
30
31/// The first block is split into `MINI_BLOCK_COUNT` mini-blocks
32///
33/// This helps to reduce the space amplification for small strings
34pub const MINI_BLOCK_COUNT: usize = 4;
35
36/// The mini block size
37pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;
38
39/// The continuation token
40pub const BLOCK_CONTINUATION: u8 = 0xFF;
41
42/// Indicates an empty string
43pub const EMPTY_SENTINEL: u8 = 1;
44
45/// Indicates a non-empty string
46pub const NON_EMPTY_SENTINEL: u8 = 2;
47
48/// Returns the length of the encoded representation of a byte array, including the null byte
49#[inline]
50pub fn encoded_len(a: Option<&[u8]>) -> usize {
51    padded_length(a.map(|x| x.len()))
52}
53
54/// Returns the padded length of the encoded length of the given length
55#[inline]
56pub fn padded_length(a: Option<usize>) -> usize {
57    match a {
58        Some(a) if a <= BLOCK_SIZE => 1 + ceil(a, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1),
59        // Each miniblock ends with a 1 byte continuation, therefore add
60        // `(MINI_BLOCK_COUNT - 1)` additional bytes over non-miniblock size
61        Some(a) => MINI_BLOCK_COUNT + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
62        None => 1,
63    }
64}
65
66/// Variable length values are encoded as
67///
68/// - single `0_u8` if null
69/// - single `1_u8` if empty array
70/// - `2_u8` if not empty, followed by one or more blocks
71///
72/// where a block is encoded as
73///
74/// - [`BLOCK_SIZE`] bytes of string data, padded with 0s
75/// - `0xFF_u8` if this is not the last block for this string
76/// - otherwise the length of the block as a `u8`
77pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
78    data: &mut [u8],
79    offsets: &mut [usize],
80    i: I,
81    opts: SortOptions,
82) {
83    for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
84        *offset += encode_one(&mut data[*offset..], maybe_val, opts);
85    }
86}
87
88/// Calls [`encode`] with optimized iterator for generic byte arrays
89pub(crate) fn encode_generic_byte_array<T: ByteArrayType>(
90    data: &mut [u8],
91    offsets: &mut [usize],
92    input_array: &GenericByteArray<T>,
93    opts: SortOptions,
94) {
95    let input_offsets = input_array.value_offsets();
96    let bytes = input_array.values().as_slice();
97
98    if let Some(null_buffer) = input_array.nulls().filter(|x| x.null_count() > 0) {
99        let input_iter =
100            input_offsets
101                .windows(2)
102                .zip(null_buffer.iter())
103                .map(|(start_end, is_valid)| {
104                    if is_valid {
105                        let item_range = start_end[0].as_usize()..start_end[1].as_usize();
106                        // SAFETY: the offsets of the input are valid by construction
107                        // so it is ok to use unsafe here
108                        let item = unsafe { bytes.get_unchecked(item_range) };
109                        Some(item)
110                    } else {
111                        None
112                    }
113                });
114
115        encode(data, offsets, input_iter, opts);
116    } else {
117        // Skip null checks
118        let input_iter = input_offsets.windows(2).map(|start_end| {
119            let item_range = start_end[0].as_usize()..start_end[1].as_usize();
120            // SAFETY: the offsets of the input are valid by construction
121            // so it is ok to use unsafe here
122            let item = unsafe { bytes.get_unchecked(item_range) };
123            Some(item)
124        });
125
126        encode(data, offsets, input_iter, opts);
127    }
128}
129
130pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
131    out[0] = null_sentinel(opts);
132    1
133}
134
135pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
136    out[0] = match opts.descending {
137        true => !EMPTY_SENTINEL,
138        false => EMPTY_SENTINEL,
139    };
140    1
141}
142
143#[inline]
144pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
145    match val {
146        None => encode_null(out, opts),
147        Some([]) => encode_empty(out, opts),
148        Some(val) => {
149            // Write `2_u8` to demarcate as non-empty, non-null string
150            out[0] = NON_EMPTY_SENTINEL;
151
152            let len = if val.len() <= BLOCK_SIZE {
153                1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
154            } else {
155                let (initial, rem) = val.split_at(BLOCK_SIZE);
156                let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
157                out[offset] = BLOCK_CONTINUATION;
158                1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
159            };
160
161            if opts.descending {
162                // Invert bits
163                out[..len].iter_mut().for_each(|v| *v = !*v)
164            }
165            len
166        }
167    }
168}
169
170/// Writes `val` in `SIZE` blocks with the appropriate continuation tokens
171#[inline]
172fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
173    let block_count = ceil(val.len(), SIZE);
174    let end_offset = block_count * (SIZE + 1);
175    let to_write = &mut out[..end_offset];
176
177    let chunks = val.chunks_exact(SIZE);
178    let remainder = chunks.remainder();
179    for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
180        let input: &[u8; SIZE] = input.try_into().unwrap();
181        let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();
182
183        *out_block = *input;
184
185        // Indicate that there are further blocks to follow
186        output[SIZE] = BLOCK_CONTINUATION;
187    }
188
189    if !remainder.is_empty() {
190        let start_offset = (block_count - 1) * (SIZE + 1);
191        to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
192        *to_write.last_mut().unwrap() = remainder.len() as u8;
193    } else {
194        // We must overwrite the continuation marker written by the loop above
195        *to_write.last_mut().unwrap() = SIZE as u8;
196    }
197    end_offset
198}
199
200/// Decodes a single block of data
201/// The `f` function accepts a slice of the decoded data, it may be called multiple times
202pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
203    let (non_empty_sentinel, continuation) = match options.descending {
204        true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
205        false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
206    };
207
208    if row[0] != non_empty_sentinel {
209        // Empty or null string
210        return 1;
211    }
212
213    // Extracts the block length from the sentinel
214    let block_len = |sentinel: u8| match options.descending {
215        true => !sentinel as usize,
216        false => sentinel as usize,
217    };
218
219    let mut idx = 1;
220    for _ in 0..MINI_BLOCK_COUNT {
221        let sentinel = row[idx + MINI_BLOCK_SIZE];
222        if sentinel != continuation {
223            f(&row[idx..idx + block_len(sentinel)]);
224            return idx + MINI_BLOCK_SIZE + 1;
225        }
226        f(&row[idx..idx + MINI_BLOCK_SIZE]);
227        idx += MINI_BLOCK_SIZE + 1;
228    }
229
230    loop {
231        let sentinel = row[idx + BLOCK_SIZE];
232        if sentinel != continuation {
233            f(&row[idx..idx + block_len(sentinel)]);
234            return idx + BLOCK_SIZE + 1;
235        }
236        f(&row[idx..idx + BLOCK_SIZE]);
237        idx += BLOCK_SIZE + 1;
238    }
239}
240
241/// Returns the number of bytes of encoded data
242fn decoded_len(row: &[u8], options: SortOptions) -> usize {
243    let mut len = 0;
244    decode_blocks(row, options, |block| len += block.len());
245    len
246}
247
248/// Decodes a binary array from `rows` with the provided `options`
249pub fn decode_binary<I: OffsetSizeTrait>(
250    rows: &mut [&[u8]],
251    options: SortOptions,
252) -> GenericBinaryArray<I> {
253    let len = rows.len();
254    let mut null_count = 0;
255    let nulls = MutableBuffer::collect_bool(len, |x| {
256        let valid = rows[x][0] != null_sentinel(options);
257        null_count += !valid as usize;
258        valid
259    });
260
261    let values_capacity = rows.iter().map(|row| decoded_len(row, options)).sum();
262    let mut offsets = BufferBuilder::<I>::new(len + 1);
263    offsets.append(I::zero());
264    let mut values = MutableBuffer::new(values_capacity);
265
266    for row in rows {
267        let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
268        *row = &row[offset..];
269        offsets.append(I::from_usize(values.len()).expect("offset overflow"))
270    }
271
272    if options.descending {
273        values.as_slice_mut().iter_mut().for_each(|o| *o = !*o)
274    }
275
276    let d = match I::IS_LARGE {
277        true => DataType::LargeBinary,
278        false => DataType::Binary,
279    };
280
281    let builder = ArrayDataBuilder::new(d)
282        .len(len)
283        .null_count(null_count)
284        .null_bit_buffer(Some(nulls.into()))
285        .add_buffer(offsets.finish())
286        .add_buffer(values.into());
287
288    // SAFETY:
289    // Valid by construction above
290    unsafe { GenericBinaryArray::from(builder.build_unchecked()) }
291}
292
293fn decode_binary_view_inner(
294    rows: &mut [&[u8]],
295    options: SortOptions,
296    validate_utf8: bool,
297) -> BinaryViewArray {
298    let len = rows.len();
299    let inline_str_max_len = MAX_INLINE_VIEW_LEN as usize;
300
301    let mut null_count = 0;
302
303    let nulls = MutableBuffer::collect_bool(len, |x| {
304        let valid = rows[x][0] != null_sentinel(options);
305        null_count += !valid as usize;
306        valid
307    });
308
309    // If we are validating UTF-8, decode all string values (including short strings)
310    // into the values buffer and validate UTF-8 once. If not validating,
311    // we save memory by only copying long strings to the values buffer, as short strings
312    // will be inlined into the view and do not need to be stored redundantly.
313    let values_capacity = if validate_utf8 {
314        // Capacity for all long and short strings
315        rows.iter().map(|row| decoded_len(row, options)).sum()
316    } else {
317        // Capacity for all long strings plus room for one short string
318        rows.iter().fold(0, |acc, row| {
319            let len = decoded_len(row, options);
320            if len > inline_str_max_len {
321                acc + len
322            } else {
323                acc
324            }
325        }) + inline_str_max_len
326    };
327    let mut values = MutableBuffer::new(values_capacity);
328
329    let mut views = BufferBuilder::<u128>::new(len);
330    for row in rows {
331        let start_offset = values.len();
332        let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
333        // Measure string length via change in values buffer.
334        // Used to check if decoded value should be truncated (short string) when validate_utf8 is false
335        let decoded_len = values.len() - start_offset;
336        if row[0] == null_sentinel(options) {
337            debug_assert_eq!(offset, 1);
338            debug_assert_eq!(start_offset, values.len());
339            views.append(0);
340        } else {
341            // Safety: we just appended the data to the end of the buffer
342            let val = unsafe { values.get_unchecked_mut(start_offset..) };
343
344            if options.descending {
345                val.iter_mut().for_each(|o| *o = !*o);
346            }
347
348            let view = make_view(val, 0, start_offset as u32);
349            views.append(view);
350
351            // truncate inline string in values buffer if validate_utf8 is false
352            if !validate_utf8 && decoded_len <= inline_str_max_len {
353                values.truncate(start_offset);
354            }
355        }
356        *row = &row[offset..];
357    }
358
359    if validate_utf8 {
360        // the values contains all data, no matter if it is short or long
361        // we can validate utf8 in one go.
362        std::str::from_utf8(values.as_slice()).unwrap();
363    }
364
365    let builder = ArrayDataBuilder::new(DataType::BinaryView)
366        .len(len)
367        .null_count(null_count)
368        .null_bit_buffer(Some(nulls.into()))
369        .add_buffer(views.finish())
370        .add_buffer(values.into());
371
372    // SAFETY:
373    // Valid by construction above
374    unsafe { BinaryViewArray::from(builder.build_unchecked()) }
375}
376
377/// Decodes a binary view array from `rows` with the provided `options`
378pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray {
379    decode_binary_view_inner(rows, options, false)
380}
381
382/// Decodes a string array from `rows` with the provided `options`
383///
384/// # Safety
385///
386/// The row must contain valid UTF-8 data
387pub unsafe fn decode_string<I: OffsetSizeTrait>(
388    rows: &mut [&[u8]],
389    options: SortOptions,
390    validate_utf8: bool,
391) -> GenericStringArray<I> {
392    let decoded = decode_binary::<I>(rows, options);
393
394    if validate_utf8 {
395        return GenericStringArray::from(decoded);
396    }
397
398    let builder = decoded
399        .into_data()
400        .into_builder()
401        .data_type(GenericStringArray::<I>::DATA_TYPE);
402
403    // SAFETY:
404    // Row data must have come from a valid UTF-8 array
405    GenericStringArray::from(unsafe { builder.build_unchecked() })
406}
407
408/// Decodes a string view array from `rows` with the provided `options`
409///
410/// # Safety
411///
412/// The row must contain valid UTF-8 data
413pub unsafe fn decode_string_view(
414    rows: &mut [&[u8]],
415    options: SortOptions,
416    validate_utf8: bool,
417) -> StringViewArray {
418    let view = decode_binary_view_inner(rows, options, validate_utf8);
419    unsafe { view.to_string_view_unchecked() }
420}