1use crate::null_sentinel;
19use arrow_array::builder::BufferBuilder;
20use arrow_array::*;
21use arrow_buffer::bit_util::ceil;
22use arrow_buffer::MutableBuffer;
23use arrow_data::ArrayDataBuilder;
24use arrow_schema::{DataType, SortOptions};
25use builder::make_view;
26
27pub const BLOCK_SIZE: usize = 32;
29
30pub const MINI_BLOCK_COUNT: usize = 4;
34
35pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;
37
38pub const BLOCK_CONTINUATION: u8 = 0xFF;
40
41pub const EMPTY_SENTINEL: u8 = 1;
43
44pub const NON_EMPTY_SENTINEL: u8 = 2;
46
47#[inline]
49pub fn encoded_len(a: Option<&[u8]>) -> usize {
50 padded_length(a.map(|x| x.len()))
51}
52
53#[inline]
55pub fn padded_length(a: Option<usize>) -> usize {
56 match a {
57 Some(a) if a <= BLOCK_SIZE => 1 + ceil(a, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1),
58 Some(a) => MINI_BLOCK_COUNT + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
61 None => 1,
62 }
63}
64
65pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
77 data: &mut [u8],
78 offsets: &mut [usize],
79 i: I,
80 opts: SortOptions,
81) {
82 for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
83 *offset += encode_one(&mut data[*offset..], maybe_val, opts);
84 }
85}
86
87pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
88 out[0] = null_sentinel(opts);
89 1
90}
91
92pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
93 out[0] = match opts.descending {
94 true => !EMPTY_SENTINEL,
95 false => EMPTY_SENTINEL,
96 };
97 1
98}
99
100pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
101 match val {
102 None => encode_null(out, opts),
103 Some([]) => encode_empty(out, opts),
104 Some(val) => {
105 out[0] = NON_EMPTY_SENTINEL;
107
108 let len = if val.len() <= BLOCK_SIZE {
109 1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
110 } else {
111 let (initial, rem) = val.split_at(BLOCK_SIZE);
112 let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
113 out[offset] = BLOCK_CONTINUATION;
114 1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
115 };
116
117 if opts.descending {
118 out[..len].iter_mut().for_each(|v| *v = !*v)
120 }
121 len
122 }
123 }
124}
125
126#[inline]
128fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
129 let block_count = ceil(val.len(), SIZE);
130 let end_offset = block_count * (SIZE + 1);
131 let to_write = &mut out[..end_offset];
132
133 let chunks = val.chunks_exact(SIZE);
134 let remainder = chunks.remainder();
135 for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
136 let input: &[u8; SIZE] = input.try_into().unwrap();
137 let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();
138
139 *out_block = *input;
140
141 output[SIZE] = BLOCK_CONTINUATION;
143 }
144
145 if !remainder.is_empty() {
146 let start_offset = (block_count - 1) * (SIZE + 1);
147 to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
148 *to_write.last_mut().unwrap() = remainder.len() as u8;
149 } else {
150 *to_write.last_mut().unwrap() = SIZE as u8;
152 }
153 end_offset
154}
155
156pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
159 let (non_empty_sentinel, continuation) = match options.descending {
160 true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
161 false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
162 };
163
164 if row[0] != non_empty_sentinel {
165 return 1;
167 }
168
169 let block_len = |sentinel: u8| match options.descending {
171 true => !sentinel as usize,
172 false => sentinel as usize,
173 };
174
175 let mut idx = 1;
176 for _ in 0..MINI_BLOCK_COUNT {
177 let sentinel = row[idx + MINI_BLOCK_SIZE];
178 if sentinel != continuation {
179 f(&row[idx..idx + block_len(sentinel)]);
180 return idx + MINI_BLOCK_SIZE + 1;
181 }
182 f(&row[idx..idx + MINI_BLOCK_SIZE]);
183 idx += MINI_BLOCK_SIZE + 1;
184 }
185
186 loop {
187 let sentinel = row[idx + BLOCK_SIZE];
188 if sentinel != continuation {
189 f(&row[idx..idx + block_len(sentinel)]);
190 return idx + BLOCK_SIZE + 1;
191 }
192 f(&row[idx..idx + BLOCK_SIZE]);
193 idx += BLOCK_SIZE + 1;
194 }
195}
196
197fn decoded_len(row: &[u8], options: SortOptions) -> usize {
199 let mut len = 0;
200 decode_blocks(row, options, |block| len += block.len());
201 len
202}
203
204pub fn decode_binary<I: OffsetSizeTrait>(
206 rows: &mut [&[u8]],
207 options: SortOptions,
208) -> GenericBinaryArray<I> {
209 let len = rows.len();
210 let mut null_count = 0;
211 let nulls = MutableBuffer::collect_bool(len, |x| {
212 let valid = rows[x][0] != null_sentinel(options);
213 null_count += !valid as usize;
214 valid
215 });
216
217 let values_capacity = rows.iter().map(|row| decoded_len(row, options)).sum();
218 let mut offsets = BufferBuilder::<I>::new(len + 1);
219 offsets.append(I::zero());
220 let mut values = MutableBuffer::new(values_capacity);
221
222 for row in rows {
223 let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
224 *row = &row[offset..];
225 offsets.append(I::from_usize(values.len()).expect("offset overflow"))
226 }
227
228 if options.descending {
229 values.as_slice_mut().iter_mut().for_each(|o| *o = !*o)
230 }
231
232 let d = match I::IS_LARGE {
233 true => DataType::LargeBinary,
234 false => DataType::Binary,
235 };
236
237 let builder = ArrayDataBuilder::new(d)
238 .len(len)
239 .null_count(null_count)
240 .null_bit_buffer(Some(nulls.into()))
241 .add_buffer(offsets.finish())
242 .add_buffer(values.into());
243
244 unsafe { GenericBinaryArray::from(builder.build_unchecked()) }
247}
248
249fn decode_binary_view_inner(
250 rows: &mut [&[u8]],
251 options: SortOptions,
252 check_utf8: bool,
253) -> BinaryViewArray {
254 let len = rows.len();
255
256 let mut null_count = 0;
257
258 let nulls = MutableBuffer::collect_bool(len, |x| {
259 let valid = rows[x][0] != null_sentinel(options);
260 null_count += !valid as usize;
261 valid
262 });
263
264 let values_capacity: usize = rows.iter().map(|row| decoded_len(row, options)).sum();
265 let mut values = MutableBuffer::new(values_capacity);
266 let mut views = BufferBuilder::<u128>::new(len);
267
268 for row in rows {
269 let start_offset = values.len();
270 let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
271 if row[0] == null_sentinel(options) {
272 debug_assert_eq!(offset, 1);
273 debug_assert_eq!(start_offset, values.len());
274 views.append(0);
275 } else {
276 let val = unsafe { values.get_unchecked_mut(start_offset..) };
278
279 if options.descending {
280 val.iter_mut().for_each(|o| *o = !*o);
281 }
282
283 let view = make_view(val, 0, start_offset as u32);
284 views.append(view);
285 }
286 *row = &row[offset..];
287 }
288
289 if check_utf8 {
290 std::str::from_utf8(values.as_slice()).unwrap();
293 }
294
295 let builder = ArrayDataBuilder::new(DataType::BinaryView)
296 .len(len)
297 .null_count(null_count)
298 .null_bit_buffer(Some(nulls.into()))
299 .add_buffer(views.finish())
300 .add_buffer(values.into());
301
302 unsafe { BinaryViewArray::from(builder.build_unchecked()) }
305}
306
307pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray {
309 decode_binary_view_inner(rows, options, false)
310}
311
312pub unsafe fn decode_string<I: OffsetSizeTrait>(
318 rows: &mut [&[u8]],
319 options: SortOptions,
320 validate_utf8: bool,
321) -> GenericStringArray<I> {
322 let decoded = decode_binary::<I>(rows, options);
323
324 if validate_utf8 {
325 return GenericStringArray::from(decoded);
326 }
327
328 let builder = decoded
329 .into_data()
330 .into_builder()
331 .data_type(GenericStringArray::<I>::DATA_TYPE);
332
333 GenericStringArray::from(builder.build_unchecked())
336}
337
338pub unsafe fn decode_string_view(
344 rows: &mut [&[u8]],
345 options: SortOptions,
346 validate_utf8: bool,
347) -> StringViewArray {
348 let view = decode_binary_view_inner(rows, options, validate_utf8);
349 view.to_string_view_unchecked()
350}