1use crate::null_sentinel;
19use arrow_array::builder::BufferBuilder;
20use arrow_array::types::ByteArrayType;
21use arrow_array::*;
22use arrow_buffer::bit_util::ceil;
23use arrow_buffer::{ArrowNativeType, MutableBuffer};
24use arrow_data::{ArrayDataBuilder, MAX_INLINE_VIEW_LEN};
25use arrow_schema::{DataType, SortOptions};
26use builder::make_view;
27
28pub const BLOCK_SIZE: usize = 32;
30
31pub const MINI_BLOCK_COUNT: usize = 4;
35
36pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;
38
39pub const BLOCK_CONTINUATION: u8 = 0xFF;
41
42pub const EMPTY_SENTINEL: u8 = 1;
44
45pub const NON_EMPTY_SENTINEL: u8 = 2;
47
48pub const NULL_VALUE_SENTINEL: u8 = 3;
50
51#[inline]
53pub fn padded_length(a: Option<usize>) -> usize {
54 match a {
55 Some(a) => non_null_padded_length(a),
56 None => 1,
57 }
58}
59
60#[inline]
62pub(crate) fn non_null_padded_length(len: usize) -> usize {
63 if len <= BLOCK_SIZE {
64 1 + ceil(len, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1)
65 } else {
66 MINI_BLOCK_COUNT + ceil(len, BLOCK_SIZE) * (BLOCK_SIZE + 1)
69 }
70}
71
72pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
84 data: &mut [u8],
85 offsets: &mut [usize],
86 i: I,
87 opts: SortOptions,
88) {
89 for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
90 *offset += encode_one(&mut data[*offset..], maybe_val, opts);
91 }
92}
93
94pub(crate) fn encode_generic_byte_array<T: ByteArrayType>(
96 data: &mut [u8],
97 offsets: &mut [usize],
98 input_array: &GenericByteArray<T>,
99 opts: SortOptions,
100) {
101 let input_offsets = input_array.value_offsets();
102 let bytes = input_array.values().as_slice();
103
104 if let Some(null_buffer) = input_array.nulls().filter(|x| x.null_count() > 0) {
105 let input_iter =
106 input_offsets
107 .windows(2)
108 .zip(null_buffer.iter())
109 .map(|(start_end, is_valid)| {
110 if is_valid {
111 let item_range = start_end[0].as_usize()..start_end[1].as_usize();
112 let item = unsafe { bytes.get_unchecked(item_range) };
115 Some(item)
116 } else {
117 None
118 }
119 });
120
121 encode(data, offsets, input_iter, opts);
122 } else {
123 let input_iter = input_offsets.windows(2).map(|start_end| {
125 let item_range = start_end[0].as_usize()..start_end[1].as_usize();
126 let item = unsafe { bytes.get_unchecked(item_range) };
129 Some(item)
130 });
131
132 encode(data, offsets, input_iter, opts);
133 }
134}
135
136pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
137 out[0] = null_sentinel(opts);
138 1
139}
140
141pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
142 out[0] = match opts.descending {
143 true => !EMPTY_SENTINEL,
144 false => EMPTY_SENTINEL,
145 };
146 1
147}
148
149pub fn encode_null_value(out: &mut [u8], opts: SortOptions) -> usize {
151 out[0] = match opts.descending {
152 true => !NON_EMPTY_SENTINEL,
153 false => NON_EMPTY_SENTINEL,
154 };
155 out[1] = match opts.descending {
156 true => !NULL_VALUE_SENTINEL,
157 false => NULL_VALUE_SENTINEL,
158 };
159 2
160}
161
162#[inline]
163pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
164 match val {
165 None => encode_null(out, opts),
166 Some([]) => encode_empty(out, opts),
167 Some(val) => {
168 out[0] = NON_EMPTY_SENTINEL;
170
171 let len = if val.len() <= BLOCK_SIZE {
172 1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
173 } else {
174 let (initial, rem) = val.split_at(BLOCK_SIZE);
175 let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
176 out[offset] = BLOCK_CONTINUATION;
177 1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
178 };
179
180 if opts.descending {
181 out[..len].iter_mut().for_each(|v| *v = !*v)
183 }
184 len
185 }
186 }
187}
188
189#[inline]
191fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
192 let block_count = ceil(val.len(), SIZE);
193 let end_offset = block_count * (SIZE + 1);
194 let to_write = &mut out[..end_offset];
195
196 let chunks = val.chunks_exact(SIZE);
197 let remainder = chunks.remainder();
198 for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
199 let input: &[u8; SIZE] = input.try_into().unwrap();
200 let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();
201
202 *out_block = *input;
203
204 output[SIZE] = BLOCK_CONTINUATION;
206 }
207
208 if !remainder.is_empty() {
209 let start_offset = (block_count - 1) * (SIZE + 1);
210 to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
211 *to_write.last_mut().unwrap() = remainder.len() as u8;
212 } else {
213 *to_write.last_mut().unwrap() = SIZE as u8;
215 }
216 end_offset
217}
218
219pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
222 let (non_empty_sentinel, continuation) = match options.descending {
223 true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
224 false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
225 };
226
227 if row[0] != non_empty_sentinel {
228 return 1;
230 }
231
232 let block_len = |sentinel: u8| match options.descending {
234 true => !sentinel as usize,
235 false => sentinel as usize,
236 };
237
238 let mut idx = 1;
239 for _ in 0..MINI_BLOCK_COUNT {
240 let sentinel = row[idx + MINI_BLOCK_SIZE];
241 if sentinel != continuation {
242 f(&row[idx..idx + block_len(sentinel)]);
243 return idx + MINI_BLOCK_SIZE + 1;
244 }
245 f(&row[idx..idx + MINI_BLOCK_SIZE]);
246 idx += MINI_BLOCK_SIZE + 1;
247 }
248
249 loop {
250 let sentinel = row[idx + BLOCK_SIZE];
251 if sentinel != continuation {
252 f(&row[idx..idx + block_len(sentinel)]);
253 return idx + BLOCK_SIZE + 1;
254 }
255 f(&row[idx..idx + BLOCK_SIZE]);
256 idx += BLOCK_SIZE + 1;
257 }
258}
259
260fn decoded_len(row: &[u8], options: SortOptions) -> usize {
262 let mut len = 0;
263 decode_blocks(row, options, |block| len += block.len());
264 len
265}
266
267pub fn decode_binary<I: OffsetSizeTrait>(
269 rows: &mut [&[u8]],
270 options: SortOptions,
271) -> GenericBinaryArray<I> {
272 let len = rows.len();
273 let mut null_count = 0;
274 let nulls = MutableBuffer::collect_bool(len, |x| {
275 let valid = rows[x][0] != null_sentinel(options);
276 null_count += !valid as usize;
277 valid
278 });
279
280 let values_capacity = rows.iter().map(|row| decoded_len(row, options)).sum();
281 let mut offsets = BufferBuilder::<I>::new(len + 1);
282 offsets.append(I::zero());
283 let mut values = MutableBuffer::new(values_capacity);
284
285 for row in rows {
286 let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
287 *row = &row[offset..];
288 offsets.append(I::from_usize(values.len()).expect("offset overflow"))
289 }
290
291 if options.descending {
292 values.as_slice_mut().iter_mut().for_each(|o| *o = !*o)
293 }
294
295 let d = match I::IS_LARGE {
296 true => DataType::LargeBinary,
297 false => DataType::Binary,
298 };
299
300 let builder = ArrayDataBuilder::new(d)
301 .len(len)
302 .null_count(null_count)
303 .null_bit_buffer(Some(nulls.into()))
304 .add_buffer(offsets.finish())
305 .add_buffer(values.into());
306
307 unsafe { GenericBinaryArray::from(builder.build_unchecked()) }
310}
311
312fn decode_binary_view_inner(
313 rows: &mut [&[u8]],
314 options: SortOptions,
315 validate_utf8: bool,
316) -> BinaryViewArray {
317 let len = rows.len();
318 let inline_str_max_len = MAX_INLINE_VIEW_LEN as usize;
319
320 let mut null_count = 0;
321
322 let nulls = MutableBuffer::collect_bool(len, |x| {
323 let valid = rows[x][0] != null_sentinel(options);
324 null_count += !valid as usize;
325 valid
326 });
327
328 let values_capacity = if validate_utf8 {
333 rows.iter().map(|row| decoded_len(row, options)).sum()
335 } else {
336 rows.iter().fold(0, |acc, row| {
338 let len = decoded_len(row, options);
339 if len > inline_str_max_len {
340 acc + len
341 } else {
342 acc
343 }
344 }) + inline_str_max_len
345 };
346 let mut values = MutableBuffer::new(values_capacity);
347
348 let mut views = BufferBuilder::<u128>::new(len);
349 for row in rows {
350 let start_offset = values.len();
351 let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
352 let decoded_len = values.len() - start_offset;
355 if row[0] == null_sentinel(options) {
356 debug_assert_eq!(offset, 1);
357 debug_assert_eq!(start_offset, values.len());
358 views.append(0);
359 } else {
360 let val = unsafe { values.get_unchecked_mut(start_offset..) };
362
363 if options.descending {
364 val.iter_mut().for_each(|o| *o = !*o);
365 }
366
367 let view = make_view(val, 0, start_offset as u32);
368 views.append(view);
369
370 if !validate_utf8 && decoded_len <= inline_str_max_len {
372 values.truncate(start_offset);
373 }
374 }
375 *row = &row[offset..];
376 }
377
378 if validate_utf8 {
379 std::str::from_utf8(values.as_slice()).unwrap();
382 }
383
384 let builder = ArrayDataBuilder::new(DataType::BinaryView)
385 .len(len)
386 .null_count(null_count)
387 .null_bit_buffer(Some(nulls.into()))
388 .add_buffer(views.finish())
389 .add_buffer(values.into());
390
391 unsafe { BinaryViewArray::from(builder.build_unchecked()) }
394}
395
396pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray {
398 decode_binary_view_inner(rows, options, false)
399}
400
401pub unsafe fn decode_string<I: OffsetSizeTrait>(
407 rows: &mut [&[u8]],
408 options: SortOptions,
409 validate_utf8: bool,
410) -> GenericStringArray<I> {
411 let decoded = decode_binary::<I>(rows, options);
412
413 if validate_utf8 {
414 return GenericStringArray::from(decoded);
415 }
416
417 let builder = decoded
418 .into_data()
419 .into_builder()
420 .data_type(GenericStringArray::<I>::DATA_TYPE);
421
422 GenericStringArray::from(unsafe { builder.build_unchecked() })
425}
426
427pub unsafe fn decode_string_view(
433 rows: &mut [&[u8]],
434 options: SortOptions,
435 validate_utf8: bool,
436) -> StringViewArray {
437 let view = decode_binary_view_inner(rows, options, validate_utf8);
438 unsafe { view.to_string_view_unchecked() }
439}
440
441pub fn decode_null_value(rows: &mut [&[u8]], options: SortOptions) {
442 for row in rows.iter_mut() {
443 let (sentinel1, sentinel2) = match options.descending {
444 true => (!NON_EMPTY_SENTINEL, !NULL_VALUE_SENTINEL),
445 false => (NON_EMPTY_SENTINEL, NULL_VALUE_SENTINEL),
446 };
447 debug_assert_eq!(row[0], sentinel1, "Expected NULL_VALUE_SENTINEL at byte 0");
448 debug_assert_eq!(row[1], sentinel2, "Expected NULL_VALUE_SENTINEL at byte 1");
449 *row = &row[2..];
450 }
451}