arrow_row/
variable.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::null_sentinel;
19use arrow_array::builder::BufferBuilder;
20use arrow_array::types::ByteArrayType;
21use arrow_array::*;
22use arrow_buffer::bit_util::ceil;
23use arrow_buffer::{ArrowNativeType, MutableBuffer};
24use arrow_data::{ArrayDataBuilder, MAX_INLINE_VIEW_LEN};
25use arrow_schema::{DataType, SortOptions};
26use builder::make_view;
27
28/// The block size of the variable length encoding
29pub const BLOCK_SIZE: usize = 32;
30
31/// The first block is split into `MINI_BLOCK_COUNT` mini-blocks
32///
33/// This helps to reduce the space amplification for small strings
34pub const MINI_BLOCK_COUNT: usize = 4;
35
36/// The mini block size
37pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;
38
39/// The continuation token
40pub const BLOCK_CONTINUATION: u8 = 0xFF;
41
42/// Indicates an empty string
43pub const EMPTY_SENTINEL: u8 = 1;
44
45/// Indicates a non-empty string
46pub const NON_EMPTY_SENTINEL: u8 = 2;
47
48/// Returns the padded length of the encoded length of the given length
49#[inline]
50pub fn padded_length(a: Option<usize>) -> usize {
51    match a {
52        Some(a) => non_null_padded_length(a),
53        None => 1,
54    }
55}
56
57/// Returns the padded length of the encoded length of the given length
58#[inline]
59pub(crate) fn non_null_padded_length(len: usize) -> usize {
60    if len <= BLOCK_SIZE {
61        1 + ceil(len, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1)
62    } else {
63        // Each miniblock ends with a 1 byte continuation, therefore add
64        // `(MINI_BLOCK_COUNT - 1)` additional bytes over non-miniblock size
65        MINI_BLOCK_COUNT + ceil(len, BLOCK_SIZE) * (BLOCK_SIZE + 1)
66    }
67}
68
69/// Variable length values are encoded as
70///
71/// - single `0_u8` if null
72/// - single `1_u8` if empty array
73/// - `2_u8` if not empty, followed by one or more blocks
74///
75/// where a block is encoded as
76///
77/// - [`BLOCK_SIZE`] bytes of string data, padded with 0s
78/// - `0xFF_u8` if this is not the last block for this string
79/// - otherwise the length of the block as a `u8`
80pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
81    data: &mut [u8],
82    offsets: &mut [usize],
83    i: I,
84    opts: SortOptions,
85) {
86    for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
87        *offset += encode_one(&mut data[*offset..], maybe_val, opts);
88    }
89}
90
91/// Calls [`encode`] with optimized iterator for generic byte arrays
92pub(crate) fn encode_generic_byte_array<T: ByteArrayType>(
93    data: &mut [u8],
94    offsets: &mut [usize],
95    input_array: &GenericByteArray<T>,
96    opts: SortOptions,
97) {
98    let input_offsets = input_array.value_offsets();
99    let bytes = input_array.values().as_slice();
100
101    if let Some(null_buffer) = input_array.nulls().filter(|x| x.null_count() > 0) {
102        let input_iter =
103            input_offsets
104                .windows(2)
105                .zip(null_buffer.iter())
106                .map(|(start_end, is_valid)| {
107                    if is_valid {
108                        let item_range = start_end[0].as_usize()..start_end[1].as_usize();
109                        // SAFETY: the offsets of the input are valid by construction
110                        // so it is ok to use unsafe here
111                        let item = unsafe { bytes.get_unchecked(item_range) };
112                        Some(item)
113                    } else {
114                        None
115                    }
116                });
117
118        encode(data, offsets, input_iter, opts);
119    } else {
120        // Skip null checks
121        let input_iter = input_offsets.windows(2).map(|start_end| {
122            let item_range = start_end[0].as_usize()..start_end[1].as_usize();
123            // SAFETY: the offsets of the input are valid by construction
124            // so it is ok to use unsafe here
125            let item = unsafe { bytes.get_unchecked(item_range) };
126            Some(item)
127        });
128
129        encode(data, offsets, input_iter, opts);
130    }
131}
132
133pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
134    out[0] = null_sentinel(opts);
135    1
136}
137
138pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
139    out[0] = match opts.descending {
140        true => !EMPTY_SENTINEL,
141        false => EMPTY_SENTINEL,
142    };
143    1
144}
145
146#[inline]
147pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
148    match val {
149        None => encode_null(out, opts),
150        Some([]) => encode_empty(out, opts),
151        Some(val) => {
152            // Write `2_u8` to demarcate as non-empty, non-null string
153            out[0] = NON_EMPTY_SENTINEL;
154
155            let len = if val.len() <= BLOCK_SIZE {
156                1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
157            } else {
158                let (initial, rem) = val.split_at(BLOCK_SIZE);
159                let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
160                out[offset] = BLOCK_CONTINUATION;
161                1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
162            };
163
164            if opts.descending {
165                // Invert bits
166                out[..len].iter_mut().for_each(|v| *v = !*v)
167            }
168            len
169        }
170    }
171}
172
173/// Writes `val` in `SIZE` blocks with the appropriate continuation tokens
174#[inline]
175fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
176    let block_count = ceil(val.len(), SIZE);
177    let end_offset = block_count * (SIZE + 1);
178    let to_write = &mut out[..end_offset];
179
180    let chunks = val.chunks_exact(SIZE);
181    let remainder = chunks.remainder();
182    for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
183        let input: &[u8; SIZE] = input.try_into().unwrap();
184        let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();
185
186        *out_block = *input;
187
188        // Indicate that there are further blocks to follow
189        output[SIZE] = BLOCK_CONTINUATION;
190    }
191
192    if !remainder.is_empty() {
193        let start_offset = (block_count - 1) * (SIZE + 1);
194        to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
195        *to_write.last_mut().unwrap() = remainder.len() as u8;
196    } else {
197        // We must overwrite the continuation marker written by the loop above
198        *to_write.last_mut().unwrap() = SIZE as u8;
199    }
200    end_offset
201}
202
203/// Decodes a single block of data
204/// The `f` function accepts a slice of the decoded data, it may be called multiple times
205pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
206    let (non_empty_sentinel, continuation) = match options.descending {
207        true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
208        false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
209    };
210
211    if row[0] != non_empty_sentinel {
212        // Empty or null string
213        return 1;
214    }
215
216    // Extracts the block length from the sentinel
217    let block_len = |sentinel: u8| match options.descending {
218        true => !sentinel as usize,
219        false => sentinel as usize,
220    };
221
222    let mut idx = 1;
223    for _ in 0..MINI_BLOCK_COUNT {
224        let sentinel = row[idx + MINI_BLOCK_SIZE];
225        if sentinel != continuation {
226            f(&row[idx..idx + block_len(sentinel)]);
227            return idx + MINI_BLOCK_SIZE + 1;
228        }
229        f(&row[idx..idx + MINI_BLOCK_SIZE]);
230        idx += MINI_BLOCK_SIZE + 1;
231    }
232
233    loop {
234        let sentinel = row[idx + BLOCK_SIZE];
235        if sentinel != continuation {
236            f(&row[idx..idx + block_len(sentinel)]);
237            return idx + BLOCK_SIZE + 1;
238        }
239        f(&row[idx..idx + BLOCK_SIZE]);
240        idx += BLOCK_SIZE + 1;
241    }
242}
243
244/// Returns the number of bytes of encoded data
245fn decoded_len(row: &[u8], options: SortOptions) -> usize {
246    let mut len = 0;
247    decode_blocks(row, options, |block| len += block.len());
248    len
249}
250
251/// Decodes a binary array from `rows` with the provided `options`
252pub fn decode_binary<I: OffsetSizeTrait>(
253    rows: &mut [&[u8]],
254    options: SortOptions,
255) -> GenericBinaryArray<I> {
256    let len = rows.len();
257    let mut null_count = 0;
258    let nulls = MutableBuffer::collect_bool(len, |x| {
259        let valid = rows[x][0] != null_sentinel(options);
260        null_count += !valid as usize;
261        valid
262    });
263
264    let values_capacity = rows.iter().map(|row| decoded_len(row, options)).sum();
265    let mut offsets = BufferBuilder::<I>::new(len + 1);
266    offsets.append(I::zero());
267    let mut values = MutableBuffer::new(values_capacity);
268
269    for row in rows {
270        let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
271        *row = &row[offset..];
272        offsets.append(I::from_usize(values.len()).expect("offset overflow"))
273    }
274
275    if options.descending {
276        values.as_slice_mut().iter_mut().for_each(|o| *o = !*o)
277    }
278
279    let d = match I::IS_LARGE {
280        true => DataType::LargeBinary,
281        false => DataType::Binary,
282    };
283
284    let builder = ArrayDataBuilder::new(d)
285        .len(len)
286        .null_count(null_count)
287        .null_bit_buffer(Some(nulls.into()))
288        .add_buffer(offsets.finish())
289        .add_buffer(values.into());
290
291    // SAFETY:
292    // Valid by construction above
293    unsafe { GenericBinaryArray::from(builder.build_unchecked()) }
294}
295
296fn decode_binary_view_inner(
297    rows: &mut [&[u8]],
298    options: SortOptions,
299    validate_utf8: bool,
300) -> BinaryViewArray {
301    let len = rows.len();
302    let inline_str_max_len = MAX_INLINE_VIEW_LEN as usize;
303
304    let mut null_count = 0;
305
306    let nulls = MutableBuffer::collect_bool(len, |x| {
307        let valid = rows[x][0] != null_sentinel(options);
308        null_count += !valid as usize;
309        valid
310    });
311
312    // If we are validating UTF-8, decode all string values (including short strings)
313    // into the values buffer and validate UTF-8 once. If not validating,
314    // we save memory by only copying long strings to the values buffer, as short strings
315    // will be inlined into the view and do not need to be stored redundantly.
316    let values_capacity = if validate_utf8 {
317        // Capacity for all long and short strings
318        rows.iter().map(|row| decoded_len(row, options)).sum()
319    } else {
320        // Capacity for all long strings plus room for one short string
321        rows.iter().fold(0, |acc, row| {
322            let len = decoded_len(row, options);
323            if len > inline_str_max_len {
324                acc + len
325            } else {
326                acc
327            }
328        }) + inline_str_max_len
329    };
330    let mut values = MutableBuffer::new(values_capacity);
331
332    let mut views = BufferBuilder::<u128>::new(len);
333    for row in rows {
334        let start_offset = values.len();
335        let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
336        // Measure string length via change in values buffer.
337        // Used to check if decoded value should be truncated (short string) when validate_utf8 is false
338        let decoded_len = values.len() - start_offset;
339        if row[0] == null_sentinel(options) {
340            debug_assert_eq!(offset, 1);
341            debug_assert_eq!(start_offset, values.len());
342            views.append(0);
343        } else {
344            // Safety: we just appended the data to the end of the buffer
345            let val = unsafe { values.get_unchecked_mut(start_offset..) };
346
347            if options.descending {
348                val.iter_mut().for_each(|o| *o = !*o);
349            }
350
351            let view = make_view(val, 0, start_offset as u32);
352            views.append(view);
353
354            // truncate inline string in values buffer if validate_utf8 is false
355            if !validate_utf8 && decoded_len <= inline_str_max_len {
356                values.truncate(start_offset);
357            }
358        }
359        *row = &row[offset..];
360    }
361
362    if validate_utf8 {
363        // the values contains all data, no matter if it is short or long
364        // we can validate utf8 in one go.
365        std::str::from_utf8(values.as_slice()).unwrap();
366    }
367
368    let builder = ArrayDataBuilder::new(DataType::BinaryView)
369        .len(len)
370        .null_count(null_count)
371        .null_bit_buffer(Some(nulls.into()))
372        .add_buffer(views.finish())
373        .add_buffer(values.into());
374
375    // SAFETY:
376    // Valid by construction above
377    unsafe { BinaryViewArray::from(builder.build_unchecked()) }
378}
379
380/// Decodes a binary view array from `rows` with the provided `options`
381pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray {
382    decode_binary_view_inner(rows, options, false)
383}
384
385/// Decodes a string array from `rows` with the provided `options`
386///
387/// # Safety
388///
389/// The row must contain valid UTF-8 data
390pub unsafe fn decode_string<I: OffsetSizeTrait>(
391    rows: &mut [&[u8]],
392    options: SortOptions,
393    validate_utf8: bool,
394) -> GenericStringArray<I> {
395    let decoded = decode_binary::<I>(rows, options);
396
397    if validate_utf8 {
398        return GenericStringArray::from(decoded);
399    }
400
401    let builder = decoded
402        .into_data()
403        .into_builder()
404        .data_type(GenericStringArray::<I>::DATA_TYPE);
405
406    // SAFETY:
407    // Row data must have come from a valid UTF-8 array
408    GenericStringArray::from(unsafe { builder.build_unchecked() })
409}
410
411/// Decodes a string view array from `rows` with the provided `options`
412///
413/// # Safety
414///
415/// The row must contain valid UTF-8 data
416pub unsafe fn decode_string_view(
417    rows: &mut [&[u8]],
418    options: SortOptions,
419    validate_utf8: bool,
420) -> StringViewArray {
421    let view = decode_binary_view_inner(rows, options, validate_utf8);
422    unsafe { view.to_string_view_unchecked() }
423}