1use crate::null_sentinel;
19use arrow_array::builder::BufferBuilder;
20use arrow_array::types::ByteArrayType;
21use arrow_array::*;
22use arrow_buffer::bit_util::ceil;
23use arrow_buffer::{ArrowNativeType, MutableBuffer};
24use arrow_data::{ArrayDataBuilder, MAX_INLINE_VIEW_LEN};
25use arrow_schema::{DataType, SortOptions};
26use builder::make_view;
27
28pub const BLOCK_SIZE: usize = 32;
30
31pub const MINI_BLOCK_COUNT: usize = 4;
35
36pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;
38
39pub const BLOCK_CONTINUATION: u8 = 0xFF;
41
42pub const EMPTY_SENTINEL: u8 = 1;
44
45pub const NON_EMPTY_SENTINEL: u8 = 2;
47
48#[inline]
50pub fn padded_length(a: Option<usize>) -> usize {
51 match a {
52 Some(a) => non_null_padded_length(a),
53 None => 1,
54 }
55}
56
57#[inline]
59pub(crate) fn non_null_padded_length(len: usize) -> usize {
60 if len <= BLOCK_SIZE {
61 1 + ceil(len, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1)
62 } else {
63 MINI_BLOCK_COUNT + ceil(len, BLOCK_SIZE) * (BLOCK_SIZE + 1)
66 }
67}
68
69pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
81 data: &mut [u8],
82 offsets: &mut [usize],
83 i: I,
84 opts: SortOptions,
85) {
86 for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
87 *offset += encode_one(&mut data[*offset..], maybe_val, opts);
88 }
89}
90
91pub(crate) fn encode_generic_byte_array<T: ByteArrayType>(
93 data: &mut [u8],
94 offsets: &mut [usize],
95 input_array: &GenericByteArray<T>,
96 opts: SortOptions,
97) {
98 let input_offsets = input_array.value_offsets();
99 let bytes = input_array.values().as_slice();
100
101 if let Some(null_buffer) = input_array.nulls().filter(|x| x.null_count() > 0) {
102 let input_iter =
103 input_offsets
104 .windows(2)
105 .zip(null_buffer.iter())
106 .map(|(start_end, is_valid)| {
107 if is_valid {
108 let item_range = start_end[0].as_usize()..start_end[1].as_usize();
109 let item = unsafe { bytes.get_unchecked(item_range) };
112 Some(item)
113 } else {
114 None
115 }
116 });
117
118 encode(data, offsets, input_iter, opts);
119 } else {
120 let input_iter = input_offsets.windows(2).map(|start_end| {
122 let item_range = start_end[0].as_usize()..start_end[1].as_usize();
123 let item = unsafe { bytes.get_unchecked(item_range) };
126 Some(item)
127 });
128
129 encode(data, offsets, input_iter, opts);
130 }
131}
132
133pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
134 out[0] = null_sentinel(opts);
135 1
136}
137
138pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
139 out[0] = match opts.descending {
140 true => !EMPTY_SENTINEL,
141 false => EMPTY_SENTINEL,
142 };
143 1
144}
145
146#[inline]
147pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
148 match val {
149 None => encode_null(out, opts),
150 Some([]) => encode_empty(out, opts),
151 Some(val) => {
152 out[0] = NON_EMPTY_SENTINEL;
154
155 let len = if val.len() <= BLOCK_SIZE {
156 1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
157 } else {
158 let (initial, rem) = val.split_at(BLOCK_SIZE);
159 let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
160 out[offset] = BLOCK_CONTINUATION;
161 1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
162 };
163
164 if opts.descending {
165 out[..len].iter_mut().for_each(|v| *v = !*v)
167 }
168 len
169 }
170 }
171}
172
173#[inline]
175fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
176 let block_count = ceil(val.len(), SIZE);
177 let end_offset = block_count * (SIZE + 1);
178 let to_write = &mut out[..end_offset];
179
180 let chunks = val.chunks_exact(SIZE);
181 let remainder = chunks.remainder();
182 for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
183 let input: &[u8; SIZE] = input.try_into().unwrap();
184 let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();
185
186 *out_block = *input;
187
188 output[SIZE] = BLOCK_CONTINUATION;
190 }
191
192 if !remainder.is_empty() {
193 let start_offset = (block_count - 1) * (SIZE + 1);
194 to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
195 *to_write.last_mut().unwrap() = remainder.len() as u8;
196 } else {
197 *to_write.last_mut().unwrap() = SIZE as u8;
199 }
200 end_offset
201}
202
203pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
206 let (non_empty_sentinel, continuation) = match options.descending {
207 true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
208 false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
209 };
210
211 if row[0] != non_empty_sentinel {
212 return 1;
214 }
215
216 let block_len = |sentinel: u8| match options.descending {
218 true => !sentinel as usize,
219 false => sentinel as usize,
220 };
221
222 let mut idx = 1;
223 for _ in 0..MINI_BLOCK_COUNT {
224 let sentinel = row[idx + MINI_BLOCK_SIZE];
225 if sentinel != continuation {
226 f(&row[idx..idx + block_len(sentinel)]);
227 return idx + MINI_BLOCK_SIZE + 1;
228 }
229 f(&row[idx..idx + MINI_BLOCK_SIZE]);
230 idx += MINI_BLOCK_SIZE + 1;
231 }
232
233 loop {
234 let sentinel = row[idx + BLOCK_SIZE];
235 if sentinel != continuation {
236 f(&row[idx..idx + block_len(sentinel)]);
237 return idx + BLOCK_SIZE + 1;
238 }
239 f(&row[idx..idx + BLOCK_SIZE]);
240 idx += BLOCK_SIZE + 1;
241 }
242}
243
244fn decoded_len(row: &[u8], options: SortOptions) -> usize {
246 let mut len = 0;
247 decode_blocks(row, options, |block| len += block.len());
248 len
249}
250
251pub fn decode_binary<I: OffsetSizeTrait>(
253 rows: &mut [&[u8]],
254 options: SortOptions,
255) -> GenericBinaryArray<I> {
256 let len = rows.len();
257 let mut null_count = 0;
258 let nulls = MutableBuffer::collect_bool(len, |x| {
259 let valid = rows[x][0] != null_sentinel(options);
260 null_count += !valid as usize;
261 valid
262 });
263
264 let values_capacity = rows.iter().map(|row| decoded_len(row, options)).sum();
265 let mut offsets = BufferBuilder::<I>::new(len + 1);
266 offsets.append(I::zero());
267 let mut values = MutableBuffer::new(values_capacity);
268
269 for row in rows {
270 let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
271 *row = &row[offset..];
272 offsets.append(I::from_usize(values.len()).expect("offset overflow"))
273 }
274
275 if options.descending {
276 values.as_slice_mut().iter_mut().for_each(|o| *o = !*o)
277 }
278
279 let d = match I::IS_LARGE {
280 true => DataType::LargeBinary,
281 false => DataType::Binary,
282 };
283
284 let builder = ArrayDataBuilder::new(d)
285 .len(len)
286 .null_count(null_count)
287 .null_bit_buffer(Some(nulls.into()))
288 .add_buffer(offsets.finish())
289 .add_buffer(values.into());
290
291 unsafe { GenericBinaryArray::from(builder.build_unchecked()) }
294}
295
296fn decode_binary_view_inner(
297 rows: &mut [&[u8]],
298 options: SortOptions,
299 validate_utf8: bool,
300) -> BinaryViewArray {
301 let len = rows.len();
302 let inline_str_max_len = MAX_INLINE_VIEW_LEN as usize;
303
304 let mut null_count = 0;
305
306 let nulls = MutableBuffer::collect_bool(len, |x| {
307 let valid = rows[x][0] != null_sentinel(options);
308 null_count += !valid as usize;
309 valid
310 });
311
312 let values_capacity = if validate_utf8 {
317 rows.iter().map(|row| decoded_len(row, options)).sum()
319 } else {
320 rows.iter().fold(0, |acc, row| {
322 let len = decoded_len(row, options);
323 if len > inline_str_max_len {
324 acc + len
325 } else {
326 acc
327 }
328 }) + inline_str_max_len
329 };
330 let mut values = MutableBuffer::new(values_capacity);
331
332 let mut views = BufferBuilder::<u128>::new(len);
333 for row in rows {
334 let start_offset = values.len();
335 let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
336 let decoded_len = values.len() - start_offset;
339 if row[0] == null_sentinel(options) {
340 debug_assert_eq!(offset, 1);
341 debug_assert_eq!(start_offset, values.len());
342 views.append(0);
343 } else {
344 let val = unsafe { values.get_unchecked_mut(start_offset..) };
346
347 if options.descending {
348 val.iter_mut().for_each(|o| *o = !*o);
349 }
350
351 let view = make_view(val, 0, start_offset as u32);
352 views.append(view);
353
354 if !validate_utf8 && decoded_len <= inline_str_max_len {
356 values.truncate(start_offset);
357 }
358 }
359 *row = &row[offset..];
360 }
361
362 if validate_utf8 {
363 std::str::from_utf8(values.as_slice()).unwrap();
366 }
367
368 let builder = ArrayDataBuilder::new(DataType::BinaryView)
369 .len(len)
370 .null_count(null_count)
371 .null_bit_buffer(Some(nulls.into()))
372 .add_buffer(views.finish())
373 .add_buffer(values.into());
374
375 unsafe { BinaryViewArray::from(builder.build_unchecked()) }
378}
379
380pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray {
382 decode_binary_view_inner(rows, options, false)
383}
384
385pub unsafe fn decode_string<I: OffsetSizeTrait>(
391 rows: &mut [&[u8]],
392 options: SortOptions,
393 validate_utf8: bool,
394) -> GenericStringArray<I> {
395 let decoded = decode_binary::<I>(rows, options);
396
397 if validate_utf8 {
398 return GenericStringArray::from(decoded);
399 }
400
401 let builder = decoded
402 .into_data()
403 .into_builder()
404 .data_type(GenericStringArray::<I>::DATA_TYPE);
405
406 GenericStringArray::from(unsafe { builder.build_unchecked() })
409}
410
411pub unsafe fn decode_string_view(
417 rows: &mut [&[u8]],
418 options: SortOptions,
419 validate_utf8: bool,
420) -> StringViewArray {
421 let view = decode_binary_view_inner(rows, options, validate_utf8);
422 unsafe { view.to_string_view_unchecked() }
423}