arrow_row/
variable.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::null_sentinel;
19use arrow_array::builder::BufferBuilder;
20use arrow_array::*;
21use arrow_buffer::bit_util::ceil;
22use arrow_buffer::MutableBuffer;
23use arrow_data::ArrayDataBuilder;
24use arrow_schema::{DataType, SortOptions};
25use builder::make_view;
26
27/// The block size of the variable length encoding
28pub const BLOCK_SIZE: usize = 32;
29
30/// The first block is split into `MINI_BLOCK_COUNT` mini-blocks
31///
32/// This helps to reduce the space amplification for small strings
33pub const MINI_BLOCK_COUNT: usize = 4;
34
35/// The mini block size
36pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;
37
38/// The continuation token
39pub const BLOCK_CONTINUATION: u8 = 0xFF;
40
41/// Indicates an empty string
42pub const EMPTY_SENTINEL: u8 = 1;
43
44/// Indicates a non-empty string
45pub const NON_EMPTY_SENTINEL: u8 = 2;
46
47/// Returns the length of the encoded representation of a byte array, including the null byte
48#[inline]
49pub fn encoded_len(a: Option<&[u8]>) -> usize {
50    padded_length(a.map(|x| x.len()))
51}
52
53/// Returns the padded length of the encoded length of the given length
54#[inline]
55pub fn padded_length(a: Option<usize>) -> usize {
56    match a {
57        Some(a) if a <= BLOCK_SIZE => 1 + ceil(a, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1),
58        // Each miniblock ends with a 1 byte continuation, therefore add
59        // `(MINI_BLOCK_COUNT - 1)` additional bytes over non-miniblock size
60        Some(a) => MINI_BLOCK_COUNT + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
61        None => 1,
62    }
63}
64
65/// Variable length values are encoded as
66///
67/// - single `0_u8` if null
68/// - single `1_u8` if empty array
69/// - `2_u8` if not empty, followed by one or more blocks
70///
71/// where a block is encoded as
72///
73/// - [`BLOCK_SIZE`] bytes of string data, padded with 0s
74/// - `0xFF_u8` if this is not the last block for this string
75/// - otherwise the length of the block as a `u8`
76pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
77    data: &mut [u8],
78    offsets: &mut [usize],
79    i: I,
80    opts: SortOptions,
81) {
82    for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
83        *offset += encode_one(&mut data[*offset..], maybe_val, opts);
84    }
85}
86
87pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
88    out[0] = null_sentinel(opts);
89    1
90}
91
92pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
93    out[0] = match opts.descending {
94        true => !EMPTY_SENTINEL,
95        false => EMPTY_SENTINEL,
96    };
97    1
98}
99
100pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
101    match val {
102        None => encode_null(out, opts),
103        Some([]) => encode_empty(out, opts),
104        Some(val) => {
105            // Write `2_u8` to demarcate as non-empty, non-null string
106            out[0] = NON_EMPTY_SENTINEL;
107
108            let len = if val.len() <= BLOCK_SIZE {
109                1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
110            } else {
111                let (initial, rem) = val.split_at(BLOCK_SIZE);
112                let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
113                out[offset] = BLOCK_CONTINUATION;
114                1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
115            };
116
117            if opts.descending {
118                // Invert bits
119                out[..len].iter_mut().for_each(|v| *v = !*v)
120            }
121            len
122        }
123    }
124}
125
126/// Writes `val` in `SIZE` blocks with the appropriate continuation tokens
127#[inline]
128fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
129    let block_count = ceil(val.len(), SIZE);
130    let end_offset = block_count * (SIZE + 1);
131    let to_write = &mut out[..end_offset];
132
133    let chunks = val.chunks_exact(SIZE);
134    let remainder = chunks.remainder();
135    for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
136        let input: &[u8; SIZE] = input.try_into().unwrap();
137        let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();
138
139        *out_block = *input;
140
141        // Indicate that there are further blocks to follow
142        output[SIZE] = BLOCK_CONTINUATION;
143    }
144
145    if !remainder.is_empty() {
146        let start_offset = (block_count - 1) * (SIZE + 1);
147        to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
148        *to_write.last_mut().unwrap() = remainder.len() as u8;
149    } else {
150        // We must overwrite the continuation marker written by the loop above
151        *to_write.last_mut().unwrap() = SIZE as u8;
152    }
153    end_offset
154}
155
156/// Decodes a single block of data
157/// The `f` function accepts a slice of the decoded data, it may be called multiple times
158pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
159    let (non_empty_sentinel, continuation) = match options.descending {
160        true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
161        false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
162    };
163
164    if row[0] != non_empty_sentinel {
165        // Empty or null string
166        return 1;
167    }
168
169    // Extracts the block length from the sentinel
170    let block_len = |sentinel: u8| match options.descending {
171        true => !sentinel as usize,
172        false => sentinel as usize,
173    };
174
175    let mut idx = 1;
176    for _ in 0..MINI_BLOCK_COUNT {
177        let sentinel = row[idx + MINI_BLOCK_SIZE];
178        if sentinel != continuation {
179            f(&row[idx..idx + block_len(sentinel)]);
180            return idx + MINI_BLOCK_SIZE + 1;
181        }
182        f(&row[idx..idx + MINI_BLOCK_SIZE]);
183        idx += MINI_BLOCK_SIZE + 1;
184    }
185
186    loop {
187        let sentinel = row[idx + BLOCK_SIZE];
188        if sentinel != continuation {
189            f(&row[idx..idx + block_len(sentinel)]);
190            return idx + BLOCK_SIZE + 1;
191        }
192        f(&row[idx..idx + BLOCK_SIZE]);
193        idx += BLOCK_SIZE + 1;
194    }
195}
196
197/// Returns the number of bytes of encoded data
198fn decoded_len(row: &[u8], options: SortOptions) -> usize {
199    let mut len = 0;
200    decode_blocks(row, options, |block| len += block.len());
201    len
202}
203
204/// Decodes a binary array from `rows` with the provided `options`
205pub fn decode_binary<I: OffsetSizeTrait>(
206    rows: &mut [&[u8]],
207    options: SortOptions,
208) -> GenericBinaryArray<I> {
209    let len = rows.len();
210    let mut null_count = 0;
211    let nulls = MutableBuffer::collect_bool(len, |x| {
212        let valid = rows[x][0] != null_sentinel(options);
213        null_count += !valid as usize;
214        valid
215    });
216
217    let values_capacity = rows.iter().map(|row| decoded_len(row, options)).sum();
218    let mut offsets = BufferBuilder::<I>::new(len + 1);
219    offsets.append(I::zero());
220    let mut values = MutableBuffer::new(values_capacity);
221
222    for row in rows {
223        let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
224        *row = &row[offset..];
225        offsets.append(I::from_usize(values.len()).expect("offset overflow"))
226    }
227
228    if options.descending {
229        values.as_slice_mut().iter_mut().for_each(|o| *o = !*o)
230    }
231
232    let d = match I::IS_LARGE {
233        true => DataType::LargeBinary,
234        false => DataType::Binary,
235    };
236
237    let builder = ArrayDataBuilder::new(d)
238        .len(len)
239        .null_count(null_count)
240        .null_bit_buffer(Some(nulls.into()))
241        .add_buffer(offsets.finish())
242        .add_buffer(values.into());
243
244    // SAFETY:
245    // Valid by construction above
246    unsafe { GenericBinaryArray::from(builder.build_unchecked()) }
247}
248
249fn decode_binary_view_inner(
250    rows: &mut [&[u8]],
251    options: SortOptions,
252    check_utf8: bool,
253) -> BinaryViewArray {
254    let len = rows.len();
255
256    let mut null_count = 0;
257
258    let nulls = MutableBuffer::collect_bool(len, |x| {
259        let valid = rows[x][0] != null_sentinel(options);
260        null_count += !valid as usize;
261        valid
262    });
263
264    let values_capacity: usize = rows.iter().map(|row| decoded_len(row, options)).sum();
265    let mut values = MutableBuffer::new(values_capacity);
266    let mut views = BufferBuilder::<u128>::new(len);
267
268    for row in rows {
269        let start_offset = values.len();
270        let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
271        if row[0] == null_sentinel(options) {
272            debug_assert_eq!(offset, 1);
273            debug_assert_eq!(start_offset, values.len());
274            views.append(0);
275        } else {
276            // Safety: we just appended the data to the end of the buffer
277            let val = unsafe { values.get_unchecked_mut(start_offset..) };
278
279            if options.descending {
280                val.iter_mut().for_each(|o| *o = !*o);
281            }
282
283            let view = make_view(val, 0, start_offset as u32);
284            views.append(view);
285        }
286        *row = &row[offset..];
287    }
288
289    if check_utf8 {
290        // the values contains all data, no matter if it is short or long
291        // we can validate utf8 in one go.
292        std::str::from_utf8(values.as_slice()).unwrap();
293    }
294
295    let builder = ArrayDataBuilder::new(DataType::BinaryView)
296        .len(len)
297        .null_count(null_count)
298        .null_bit_buffer(Some(nulls.into()))
299        .add_buffer(views.finish())
300        .add_buffer(values.into());
301
302    // SAFETY:
303    // Valid by construction above
304    unsafe { BinaryViewArray::from(builder.build_unchecked()) }
305}
306
307/// Decodes a binary view array from `rows` with the provided `options`
308pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray {
309    decode_binary_view_inner(rows, options, false)
310}
311
312/// Decodes a string array from `rows` with the provided `options`
313///
314/// # Safety
315///
316/// The row must contain valid UTF-8 data
317pub unsafe fn decode_string<I: OffsetSizeTrait>(
318    rows: &mut [&[u8]],
319    options: SortOptions,
320    validate_utf8: bool,
321) -> GenericStringArray<I> {
322    let decoded = decode_binary::<I>(rows, options);
323
324    if validate_utf8 {
325        return GenericStringArray::from(decoded);
326    }
327
328    let builder = decoded
329        .into_data()
330        .into_builder()
331        .data_type(GenericStringArray::<I>::DATA_TYPE);
332
333    // SAFETY:
334    // Row data must have come from a valid UTF-8 array
335    GenericStringArray::from(builder.build_unchecked())
336}
337
338/// Decodes a string view array from `rows` with the provided `options`
339///
340/// # Safety
341///
342/// The row must contain valid UTF-8 data
343pub unsafe fn decode_string_view(
344    rows: &mut [&[u8]],
345    options: SortOptions,
346    validate_utf8: bool,
347) -> StringViewArray {
348    let view = decode_binary_view_inner(rows, options, validate_utf8);
349    view.to_string_view_unchecked()
350}