Skip to main content

arrow_row/
variable.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::null_sentinel;
19use arrow_array::builder::BufferBuilder;
20use arrow_array::types::ByteArrayType;
21use arrow_array::*;
22use arrow_buffer::bit_util::ceil;
23use arrow_buffer::{ArrowNativeType, MutableBuffer};
24use arrow_data::{ArrayDataBuilder, MAX_INLINE_VIEW_LEN};
25use arrow_schema::{DataType, SortOptions};
26use builder::make_view;
27
28/// The block size of the variable length encoding
29pub const BLOCK_SIZE: usize = 32;
30
31/// The first block is split into `MINI_BLOCK_COUNT` mini-blocks
32///
33/// This helps to reduce the space amplification for small strings
34pub const MINI_BLOCK_COUNT: usize = 4;
35
36/// The mini block size
37pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;
38
39/// The continuation token
40pub const BLOCK_CONTINUATION: u8 = 0xFF;
41
42/// Indicates an empty string
43pub const EMPTY_SENTINEL: u8 = 1;
44
45/// Indicates a non-empty string
46pub const NON_EMPTY_SENTINEL: u8 = 2;
47
48/// Indicates a Null value (for DataType::Null)
49pub const NULL_VALUE_SENTINEL: u8 = 3;
50
51/// Returns the padded length of the encoded length of the given length
52#[inline]
53pub fn padded_length(a: Option<usize>) -> usize {
54    match a {
55        Some(a) => non_null_padded_length(a),
56        None => 1,
57    }
58}
59
60/// Returns the padded length of the encoded length of the given length
61#[inline]
62pub(crate) fn non_null_padded_length(len: usize) -> usize {
63    if len <= BLOCK_SIZE {
64        1 + ceil(len, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1)
65    } else {
66        // Each miniblock ends with a 1 byte continuation, therefore add
67        // `(MINI_BLOCK_COUNT - 1)` additional bytes over non-miniblock size
68        MINI_BLOCK_COUNT + ceil(len, BLOCK_SIZE) * (BLOCK_SIZE + 1)
69    }
70}
71
72/// Variable length values are encoded as
73///
74/// - single `0_u8` if null
75/// - single `1_u8` if empty array
76/// - `2_u8` if not empty, followed by one or more blocks
77///
78/// where a block is encoded as
79///
80/// - [`BLOCK_SIZE`] bytes of string data, padded with 0s
81/// - `0xFF_u8` if this is not the last block for this string
82/// - otherwise the length of the block as a `u8`
83pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
84    data: &mut [u8],
85    offsets: &mut [usize],
86    i: I,
87    opts: SortOptions,
88) {
89    for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
90        *offset += encode_one(&mut data[*offset..], maybe_val, opts);
91    }
92}
93
94/// Calls [`encode`] with optimized iterator for generic byte arrays
95pub(crate) fn encode_generic_byte_array<T: ByteArrayType>(
96    data: &mut [u8],
97    offsets: &mut [usize],
98    input_array: &GenericByteArray<T>,
99    opts: SortOptions,
100) {
101    let input_offsets = input_array.value_offsets();
102    let bytes = input_array.values().as_slice();
103
104    if let Some(null_buffer) = input_array.nulls().filter(|x| x.null_count() > 0) {
105        let input_iter =
106            input_offsets
107                .windows(2)
108                .zip(null_buffer.iter())
109                .map(|(start_end, is_valid)| {
110                    if is_valid {
111                        let item_range = start_end[0].as_usize()..start_end[1].as_usize();
112                        // SAFETY: the offsets of the input are valid by construction
113                        // so it is ok to use unsafe here
114                        let item = unsafe { bytes.get_unchecked(item_range) };
115                        Some(item)
116                    } else {
117                        None
118                    }
119                });
120
121        encode(data, offsets, input_iter, opts);
122    } else {
123        // Skip null checks
124        let input_iter = input_offsets.windows(2).map(|start_end| {
125            let item_range = start_end[0].as_usize()..start_end[1].as_usize();
126            // SAFETY: the offsets of the input are valid by construction
127            // so it is ok to use unsafe here
128            let item = unsafe { bytes.get_unchecked(item_range) };
129            Some(item)
130        });
131
132        encode(data, offsets, input_iter, opts);
133    }
134}
135
136pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
137    out[0] = null_sentinel(opts);
138    1
139}
140
141pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
142    out[0] = match opts.descending {
143        true => !EMPTY_SENTINEL,
144        false => EMPTY_SENTINEL,
145    };
146    1
147}
148
149/// Ensure `NullArray`s don't get encoded as empty lists which can lose their length
150pub fn encode_null_value(out: &mut [u8], opts: SortOptions) -> usize {
151    out[0] = match opts.descending {
152        true => !NON_EMPTY_SENTINEL,
153        false => NON_EMPTY_SENTINEL,
154    };
155    out[1] = match opts.descending {
156        true => !NULL_VALUE_SENTINEL,
157        false => NULL_VALUE_SENTINEL,
158    };
159    2
160}
161
162#[inline]
163pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
164    match val {
165        None => encode_null(out, opts),
166        Some([]) => encode_empty(out, opts),
167        Some(val) => {
168            // Write `2_u8` to demarcate as non-empty, non-null string
169            out[0] = NON_EMPTY_SENTINEL;
170
171            let len = if val.len() <= BLOCK_SIZE {
172                1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
173            } else {
174                let (initial, rem) = val.split_at(BLOCK_SIZE);
175                let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
176                out[offset] = BLOCK_CONTINUATION;
177                1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
178            };
179
180            if opts.descending {
181                // Invert bits
182                out[..len].iter_mut().for_each(|v| *v = !*v)
183            }
184            len
185        }
186    }
187}
188
189/// Writes `val` in `SIZE` blocks with the appropriate continuation tokens
190#[inline]
191fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
192    let block_count = ceil(val.len(), SIZE);
193    let end_offset = block_count * (SIZE + 1);
194    let to_write = &mut out[..end_offset];
195
196    let chunks = val.chunks_exact(SIZE);
197    let remainder = chunks.remainder();
198    for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
199        let input: &[u8; SIZE] = input.try_into().unwrap();
200        let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();
201
202        *out_block = *input;
203
204        // Indicate that there are further blocks to follow
205        output[SIZE] = BLOCK_CONTINUATION;
206    }
207
208    if !remainder.is_empty() {
209        let start_offset = (block_count - 1) * (SIZE + 1);
210        to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
211        *to_write.last_mut().unwrap() = remainder.len() as u8;
212    } else {
213        // We must overwrite the continuation marker written by the loop above
214        *to_write.last_mut().unwrap() = SIZE as u8;
215    }
216    end_offset
217}
218
219/// Decodes a single block of data
220/// The `f` function accepts a slice of the decoded data, it may be called multiple times
221pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
222    let (non_empty_sentinel, continuation) = match options.descending {
223        true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
224        false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
225    };
226
227    if row[0] != non_empty_sentinel {
228        // Empty or null string
229        return 1;
230    }
231
232    // Extracts the block length from the sentinel
233    let block_len = |sentinel: u8| match options.descending {
234        true => !sentinel as usize,
235        false => sentinel as usize,
236    };
237
238    let mut idx = 1;
239    for _ in 0..MINI_BLOCK_COUNT {
240        let sentinel = row[idx + MINI_BLOCK_SIZE];
241        if sentinel != continuation {
242            f(&row[idx..idx + block_len(sentinel)]);
243            return idx + MINI_BLOCK_SIZE + 1;
244        }
245        f(&row[idx..idx + MINI_BLOCK_SIZE]);
246        idx += MINI_BLOCK_SIZE + 1;
247    }
248
249    loop {
250        let sentinel = row[idx + BLOCK_SIZE];
251        if sentinel != continuation {
252            f(&row[idx..idx + block_len(sentinel)]);
253            return idx + BLOCK_SIZE + 1;
254        }
255        f(&row[idx..idx + BLOCK_SIZE]);
256        idx += BLOCK_SIZE + 1;
257    }
258}
259
260/// Returns the number of bytes of encoded data
261fn decoded_len(row: &[u8], options: SortOptions) -> usize {
262    let mut len = 0;
263    decode_blocks(row, options, |block| len += block.len());
264    len
265}
266
267/// Decodes a binary array from `rows` with the provided `options`
268pub fn decode_binary<I: OffsetSizeTrait>(
269    rows: &mut [&[u8]],
270    options: SortOptions,
271) -> GenericBinaryArray<I> {
272    let len = rows.len();
273    let mut null_count = 0;
274    let nulls = MutableBuffer::collect_bool(len, |x| {
275        let valid = rows[x][0] != null_sentinel(options);
276        null_count += !valid as usize;
277        valid
278    });
279
280    let values_capacity = rows.iter().map(|row| decoded_len(row, options)).sum();
281    let mut offsets = BufferBuilder::<I>::new(len + 1);
282    offsets.append(I::zero());
283    let mut values = MutableBuffer::new(values_capacity);
284
285    for row in rows {
286        let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
287        *row = &row[offset..];
288        offsets.append(I::from_usize(values.len()).expect("offset overflow"))
289    }
290
291    if options.descending {
292        values.as_slice_mut().iter_mut().for_each(|o| *o = !*o)
293    }
294
295    let d = match I::IS_LARGE {
296        true => DataType::LargeBinary,
297        false => DataType::Binary,
298    };
299
300    let builder = ArrayDataBuilder::new(d)
301        .len(len)
302        .null_count(null_count)
303        .null_bit_buffer(Some(nulls.into()))
304        .add_buffer(offsets.finish())
305        .add_buffer(values.into());
306
307    // SAFETY:
308    // Valid by construction above
309    unsafe { GenericBinaryArray::from(builder.build_unchecked()) }
310}
311
312fn decode_binary_view_inner(
313    rows: &mut [&[u8]],
314    options: SortOptions,
315    validate_utf8: bool,
316) -> BinaryViewArray {
317    let len = rows.len();
318    let inline_str_max_len = MAX_INLINE_VIEW_LEN as usize;
319
320    let mut null_count = 0;
321
322    let nulls = MutableBuffer::collect_bool(len, |x| {
323        let valid = rows[x][0] != null_sentinel(options);
324        null_count += !valid as usize;
325        valid
326    });
327
328    // If we are validating UTF-8, decode all string values (including short strings)
329    // into the values buffer and validate UTF-8 once. If not validating,
330    // we save memory by only copying long strings to the values buffer, as short strings
331    // will be inlined into the view and do not need to be stored redundantly.
332    let values_capacity = if validate_utf8 {
333        // Capacity for all long and short strings
334        rows.iter().map(|row| decoded_len(row, options)).sum()
335    } else {
336        // Capacity for all long strings plus room for one short string
337        rows.iter().fold(0, |acc, row| {
338            let len = decoded_len(row, options);
339            if len > inline_str_max_len {
340                acc + len
341            } else {
342                acc
343            }
344        }) + inline_str_max_len
345    };
346    let mut values = MutableBuffer::new(values_capacity);
347
348    let mut views = BufferBuilder::<u128>::new(len);
349    for row in rows {
350        let start_offset = values.len();
351        let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
352        // Measure string length via change in values buffer.
353        // Used to check if decoded value should be truncated (short string) when validate_utf8 is false
354        let decoded_len = values.len() - start_offset;
355        if row[0] == null_sentinel(options) {
356            debug_assert_eq!(offset, 1);
357            debug_assert_eq!(start_offset, values.len());
358            views.append(0);
359        } else {
360            // Safety: we just appended the data to the end of the buffer
361            let val = unsafe { values.get_unchecked_mut(start_offset..) };
362
363            if options.descending {
364                val.iter_mut().for_each(|o| *o = !*o);
365            }
366
367            let view = make_view(val, 0, start_offset as u32);
368            views.append(view);
369
370            // truncate inline string in values buffer if validate_utf8 is false
371            if !validate_utf8 && decoded_len <= inline_str_max_len {
372                values.truncate(start_offset);
373            }
374        }
375        *row = &row[offset..];
376    }
377
378    if validate_utf8 {
379        // the values contains all data, no matter if it is short or long
380        // we can validate utf8 in one go.
381        std::str::from_utf8(values.as_slice()).unwrap();
382    }
383
384    let builder = ArrayDataBuilder::new(DataType::BinaryView)
385        .len(len)
386        .null_count(null_count)
387        .null_bit_buffer(Some(nulls.into()))
388        .add_buffer(views.finish())
389        .add_buffer(values.into());
390
391    // SAFETY:
392    // Valid by construction above
393    unsafe { BinaryViewArray::from(builder.build_unchecked()) }
394}
395
396/// Decodes a binary view array from `rows` with the provided `options`
397pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray {
398    decode_binary_view_inner(rows, options, false)
399}
400
401/// Decodes a string array from `rows` with the provided `options`
402///
403/// # Safety
404///
405/// The row must contain valid UTF-8 data
406pub unsafe fn decode_string<I: OffsetSizeTrait>(
407    rows: &mut [&[u8]],
408    options: SortOptions,
409    validate_utf8: bool,
410) -> GenericStringArray<I> {
411    let decoded = decode_binary::<I>(rows, options);
412
413    if validate_utf8 {
414        return GenericStringArray::from(decoded);
415    }
416
417    let builder = decoded
418        .into_data()
419        .into_builder()
420        .data_type(GenericStringArray::<I>::DATA_TYPE);
421
422    // SAFETY:
423    // Row data must have come from a valid UTF-8 array
424    GenericStringArray::from(unsafe { builder.build_unchecked() })
425}
426
427/// Decodes a string view array from `rows` with the provided `options`
428///
429/// # Safety
430///
431/// The row must contain valid UTF-8 data
432pub unsafe fn decode_string_view(
433    rows: &mut [&[u8]],
434    options: SortOptions,
435    validate_utf8: bool,
436) -> StringViewArray {
437    let view = decode_binary_view_inner(rows, options, validate_utf8);
438    unsafe { view.to_string_view_unchecked() }
439}
440
441pub fn decode_null_value(rows: &mut [&[u8]], options: SortOptions) {
442    for row in rows.iter_mut() {
443        let (sentinel1, sentinel2) = match options.descending {
444            true => (!NON_EMPTY_SENTINEL, !NULL_VALUE_SENTINEL),
445            false => (NON_EMPTY_SENTINEL, NULL_VALUE_SENTINEL),
446        };
447        debug_assert_eq!(row[0], sentinel1, "Expected NULL_VALUE_SENTINEL at byte 0");
448        debug_assert_eq!(row[1], sentinel2, "Expected NULL_VALUE_SENTINEL at byte 1");
449        *row = &row[2..];
450    }
451}