1use crate::null_sentinel;
19use arrow_array::builder::BufferBuilder;
20use arrow_array::types::ByteArrayType;
21use arrow_array::*;
22use arrow_buffer::bit_util::ceil;
23use arrow_buffer::{ArrowNativeType, MutableBuffer};
24use arrow_data::{ArrayDataBuilder, MAX_INLINE_VIEW_LEN};
25use arrow_schema::{DataType, SortOptions};
26use builder::make_view;
27
28pub const BLOCK_SIZE: usize = 32;
30
31pub const MINI_BLOCK_COUNT: usize = 4;
35
36pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;
38
39pub const BLOCK_CONTINUATION: u8 = 0xFF;
41
42pub const EMPTY_SENTINEL: u8 = 1;
44
45pub const NON_EMPTY_SENTINEL: u8 = 2;
47
48#[inline]
50pub fn encoded_len(a: Option<&[u8]>) -> usize {
51 padded_length(a.map(|x| x.len()))
52}
53
54#[inline]
56pub fn padded_length(a: Option<usize>) -> usize {
57 match a {
58 Some(a) if a <= BLOCK_SIZE => 1 + ceil(a, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1),
59 Some(a) => MINI_BLOCK_COUNT + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
62 None => 1,
63 }
64}
65
66pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
78 data: &mut [u8],
79 offsets: &mut [usize],
80 i: I,
81 opts: SortOptions,
82) {
83 for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
84 *offset += encode_one(&mut data[*offset..], maybe_val, opts);
85 }
86}
87
88pub(crate) fn encode_generic_byte_array<T: ByteArrayType>(
90 data: &mut [u8],
91 offsets: &mut [usize],
92 input_array: &GenericByteArray<T>,
93 opts: SortOptions,
94) {
95 let input_offsets = input_array.value_offsets();
96 let bytes = input_array.values().as_slice();
97
98 if let Some(null_buffer) = input_array.nulls().filter(|x| x.null_count() > 0) {
99 let input_iter =
100 input_offsets
101 .windows(2)
102 .zip(null_buffer.iter())
103 .map(|(start_end, is_valid)| {
104 if is_valid {
105 let item_range = start_end[0].as_usize()..start_end[1].as_usize();
106 let item = unsafe { bytes.get_unchecked(item_range) };
109 Some(item)
110 } else {
111 None
112 }
113 });
114
115 encode(data, offsets, input_iter, opts);
116 } else {
117 let input_iter = input_offsets.windows(2).map(|start_end| {
119 let item_range = start_end[0].as_usize()..start_end[1].as_usize();
120 let item = unsafe { bytes.get_unchecked(item_range) };
123 Some(item)
124 });
125
126 encode(data, offsets, input_iter, opts);
127 }
128}
129
130pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
131 out[0] = null_sentinel(opts);
132 1
133}
134
135pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
136 out[0] = match opts.descending {
137 true => !EMPTY_SENTINEL,
138 false => EMPTY_SENTINEL,
139 };
140 1
141}
142
143#[inline]
144pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
145 match val {
146 None => encode_null(out, opts),
147 Some([]) => encode_empty(out, opts),
148 Some(val) => {
149 out[0] = NON_EMPTY_SENTINEL;
151
152 let len = if val.len() <= BLOCK_SIZE {
153 1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
154 } else {
155 let (initial, rem) = val.split_at(BLOCK_SIZE);
156 let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
157 out[offset] = BLOCK_CONTINUATION;
158 1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
159 };
160
161 if opts.descending {
162 out[..len].iter_mut().for_each(|v| *v = !*v)
164 }
165 len
166 }
167 }
168}
169
170#[inline]
172fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
173 let block_count = ceil(val.len(), SIZE);
174 let end_offset = block_count * (SIZE + 1);
175 let to_write = &mut out[..end_offset];
176
177 let chunks = val.chunks_exact(SIZE);
178 let remainder = chunks.remainder();
179 for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
180 let input: &[u8; SIZE] = input.try_into().unwrap();
181 let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();
182
183 *out_block = *input;
184
185 output[SIZE] = BLOCK_CONTINUATION;
187 }
188
189 if !remainder.is_empty() {
190 let start_offset = (block_count - 1) * (SIZE + 1);
191 to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
192 *to_write.last_mut().unwrap() = remainder.len() as u8;
193 } else {
194 *to_write.last_mut().unwrap() = SIZE as u8;
196 }
197 end_offset
198}
199
200pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
203 let (non_empty_sentinel, continuation) = match options.descending {
204 true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
205 false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
206 };
207
208 if row[0] != non_empty_sentinel {
209 return 1;
211 }
212
213 let block_len = |sentinel: u8| match options.descending {
215 true => !sentinel as usize,
216 false => sentinel as usize,
217 };
218
219 let mut idx = 1;
220 for _ in 0..MINI_BLOCK_COUNT {
221 let sentinel = row[idx + MINI_BLOCK_SIZE];
222 if sentinel != continuation {
223 f(&row[idx..idx + block_len(sentinel)]);
224 return idx + MINI_BLOCK_SIZE + 1;
225 }
226 f(&row[idx..idx + MINI_BLOCK_SIZE]);
227 idx += MINI_BLOCK_SIZE + 1;
228 }
229
230 loop {
231 let sentinel = row[idx + BLOCK_SIZE];
232 if sentinel != continuation {
233 f(&row[idx..idx + block_len(sentinel)]);
234 return idx + BLOCK_SIZE + 1;
235 }
236 f(&row[idx..idx + BLOCK_SIZE]);
237 idx += BLOCK_SIZE + 1;
238 }
239}
240
241fn decoded_len(row: &[u8], options: SortOptions) -> usize {
243 let mut len = 0;
244 decode_blocks(row, options, |block| len += block.len());
245 len
246}
247
248pub fn decode_binary<I: OffsetSizeTrait>(
250 rows: &mut [&[u8]],
251 options: SortOptions,
252) -> GenericBinaryArray<I> {
253 let len = rows.len();
254 let mut null_count = 0;
255 let nulls = MutableBuffer::collect_bool(len, |x| {
256 let valid = rows[x][0] != null_sentinel(options);
257 null_count += !valid as usize;
258 valid
259 });
260
261 let values_capacity = rows.iter().map(|row| decoded_len(row, options)).sum();
262 let mut offsets = BufferBuilder::<I>::new(len + 1);
263 offsets.append(I::zero());
264 let mut values = MutableBuffer::new(values_capacity);
265
266 for row in rows {
267 let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
268 *row = &row[offset..];
269 offsets.append(I::from_usize(values.len()).expect("offset overflow"))
270 }
271
272 if options.descending {
273 values.as_slice_mut().iter_mut().for_each(|o| *o = !*o)
274 }
275
276 let d = match I::IS_LARGE {
277 true => DataType::LargeBinary,
278 false => DataType::Binary,
279 };
280
281 let builder = ArrayDataBuilder::new(d)
282 .len(len)
283 .null_count(null_count)
284 .null_bit_buffer(Some(nulls.into()))
285 .add_buffer(offsets.finish())
286 .add_buffer(values.into());
287
288 unsafe { GenericBinaryArray::from(builder.build_unchecked()) }
291}
292
293fn decode_binary_view_inner(
294 rows: &mut [&[u8]],
295 options: SortOptions,
296 validate_utf8: bool,
297) -> BinaryViewArray {
298 let len = rows.len();
299 let inline_str_max_len = MAX_INLINE_VIEW_LEN as usize;
300
301 let mut null_count = 0;
302
303 let nulls = MutableBuffer::collect_bool(len, |x| {
304 let valid = rows[x][0] != null_sentinel(options);
305 null_count += !valid as usize;
306 valid
307 });
308
309 let values_capacity = if validate_utf8 {
314 rows.iter().map(|row| decoded_len(row, options)).sum()
316 } else {
317 rows.iter().fold(0, |acc, row| {
319 let len = decoded_len(row, options);
320 if len > inline_str_max_len {
321 acc + len
322 } else {
323 acc
324 }
325 }) + inline_str_max_len
326 };
327 let mut values = MutableBuffer::new(values_capacity);
328
329 let mut views = BufferBuilder::<u128>::new(len);
330 for row in rows {
331 let start_offset = values.len();
332 let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
333 let decoded_len = values.len() - start_offset;
336 if row[0] == null_sentinel(options) {
337 debug_assert_eq!(offset, 1);
338 debug_assert_eq!(start_offset, values.len());
339 views.append(0);
340 } else {
341 let val = unsafe { values.get_unchecked_mut(start_offset..) };
343
344 if options.descending {
345 val.iter_mut().for_each(|o| *o = !*o);
346 }
347
348 let view = make_view(val, 0, start_offset as u32);
349 views.append(view);
350
351 if !validate_utf8 && decoded_len <= inline_str_max_len {
353 values.truncate(start_offset);
354 }
355 }
356 *row = &row[offset..];
357 }
358
359 if validate_utf8 {
360 std::str::from_utf8(values.as_slice()).unwrap();
363 }
364
365 let builder = ArrayDataBuilder::new(DataType::BinaryView)
366 .len(len)
367 .null_count(null_count)
368 .null_bit_buffer(Some(nulls.into()))
369 .add_buffer(views.finish())
370 .add_buffer(values.into());
371
372 unsafe { BinaryViewArray::from(builder.build_unchecked()) }
375}
376
377pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray {
379 decode_binary_view_inner(rows, options, false)
380}
381
382pub unsafe fn decode_string<I: OffsetSizeTrait>(
388 rows: &mut [&[u8]],
389 options: SortOptions,
390 validate_utf8: bool,
391) -> GenericStringArray<I> {
392 let decoded = decode_binary::<I>(rows, options);
393
394 if validate_utf8 {
395 return GenericStringArray::from(decoded);
396 }
397
398 let builder = decoded
399 .into_data()
400 .into_builder()
401 .data_type(GenericStringArray::<I>::DATA_TYPE);
402
403 GenericStringArray::from(unsafe { builder.build_unchecked() })
406}
407
408pub unsafe fn decode_string_view(
414 rows: &mut [&[u8]],
415 options: SortOptions,
416 validate_utf8: bool,
417) -> StringViewArray {
418 let view = decode_binary_view_inner(rows, options, validate_utf8);
419 unsafe { view.to_string_view_unchecked() }
420}