1use crate::array::PrimitiveArray;
19use crate::null_sentinel;
20use arrow_array::builder::BufferBuilder;
21use arrow_array::{ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray};
22use arrow_buffer::{
23 bit_util, i256, ArrowNativeType, BooleanBuffer, Buffer, IntervalDayTime, IntervalMonthDayNano,
24 MutableBuffer, NullBuffer,
25};
26use arrow_data::{ArrayData, ArrayDataBuilder};
27use arrow_schema::{DataType, SortOptions};
28use half::f16;
29
30pub trait FromSlice {
31 fn from_slice(slice: &[u8], invert: bool) -> Self;
32}
33
34impl<const N: usize> FromSlice for [u8; N] {
35 #[inline]
36 fn from_slice(slice: &[u8], invert: bool) -> Self {
37 let mut t: Self = slice.try_into().unwrap();
38 if invert {
39 t.iter_mut().for_each(|o| *o = !*o);
40 }
41 t
42 }
43}
44
45pub trait FixedLengthEncoding: Copy {
48 const ENCODED_LEN: usize = 1 + std::mem::size_of::<Self::Encoded>();
49
50 type Encoded: Sized + Copy + FromSlice + AsRef<[u8]> + AsMut<[u8]>;
51
52 fn encode(self) -> Self::Encoded;
53
54 fn decode(encoded: Self::Encoded) -> Self;
55}
56
57impl FixedLengthEncoding for bool {
58 type Encoded = [u8; 1];
59
60 fn encode(self) -> [u8; 1] {
61 [self as u8]
62 }
63
64 fn decode(encoded: Self::Encoded) -> Self {
65 encoded[0] != 0
66 }
67}
68
69macro_rules! encode_signed {
70 ($n:expr, $t:ty) => {
71 impl FixedLengthEncoding for $t {
72 type Encoded = [u8; $n];
73
74 fn encode(self) -> [u8; $n] {
75 let mut b = self.to_be_bytes();
76 b[0] ^= 0x80;
78 b
79 }
80
81 fn decode(mut encoded: Self::Encoded) -> Self {
82 encoded[0] ^= 0x80;
84 Self::from_be_bytes(encoded)
85 }
86 }
87 };
88}
89
90encode_signed!(1, i8);
91encode_signed!(2, i16);
92encode_signed!(4, i32);
93encode_signed!(8, i64);
94encode_signed!(16, i128);
95encode_signed!(32, i256);
96
97macro_rules! encode_unsigned {
98 ($n:expr, $t:ty) => {
99 impl FixedLengthEncoding for $t {
100 type Encoded = [u8; $n];
101
102 fn encode(self) -> [u8; $n] {
103 self.to_be_bytes()
104 }
105
106 fn decode(encoded: Self::Encoded) -> Self {
107 Self::from_be_bytes(encoded)
108 }
109 }
110 };
111}
112
113encode_unsigned!(1, u8);
114encode_unsigned!(2, u16);
115encode_unsigned!(4, u32);
116encode_unsigned!(8, u64);
117
118impl FixedLengthEncoding for f16 {
119 type Encoded = [u8; 2];
120
121 fn encode(self) -> [u8; 2] {
122 let s = self.to_bits() as i16;
124 let val = s ^ (((s >> 15) as u16) >> 1) as i16;
125 val.encode()
126 }
127
128 fn decode(encoded: Self::Encoded) -> Self {
129 let bits = i16::decode(encoded);
130 let val = bits ^ (((bits >> 15) as u16) >> 1) as i16;
131 Self::from_bits(val as u16)
132 }
133}
134
135impl FixedLengthEncoding for f32 {
136 type Encoded = [u8; 4];
137
138 fn encode(self) -> [u8; 4] {
139 let s = self.to_bits() as i32;
141 let val = s ^ (((s >> 31) as u32) >> 1) as i32;
142 val.encode()
143 }
144
145 fn decode(encoded: Self::Encoded) -> Self {
146 let bits = i32::decode(encoded);
147 let val = bits ^ (((bits >> 31) as u32) >> 1) as i32;
148 Self::from_bits(val as u32)
149 }
150}
151
152impl FixedLengthEncoding for f64 {
153 type Encoded = [u8; 8];
154
155 fn encode(self) -> [u8; 8] {
156 let s = self.to_bits() as i64;
158 let val = s ^ (((s >> 63) as u64) >> 1) as i64;
159 val.encode()
160 }
161
162 fn decode(encoded: Self::Encoded) -> Self {
163 let bits = i64::decode(encoded);
164 let val = bits ^ (((bits >> 63) as u64) >> 1) as i64;
165 Self::from_bits(val as u64)
166 }
167}
168
169impl FixedLengthEncoding for IntervalDayTime {
170 type Encoded = [u8; 8];
171
172 fn encode(self) -> Self::Encoded {
173 let mut out = [0_u8; 8];
174 out[..4].copy_from_slice(&self.days.encode());
175 out[4..].copy_from_slice(&self.milliseconds.encode());
176 out
177 }
178
179 fn decode(encoded: Self::Encoded) -> Self {
180 Self {
181 days: i32::decode(encoded[..4].try_into().unwrap()),
182 milliseconds: i32::decode(encoded[4..].try_into().unwrap()),
183 }
184 }
185}
186
187impl FixedLengthEncoding for IntervalMonthDayNano {
188 type Encoded = [u8; 16];
189
190 fn encode(self) -> Self::Encoded {
191 let mut out = [0_u8; 16];
192 out[..4].copy_from_slice(&self.months.encode());
193 out[4..8].copy_from_slice(&self.days.encode());
194 out[8..].copy_from_slice(&self.nanoseconds.encode());
195 out
196 }
197
198 fn decode(encoded: Self::Encoded) -> Self {
199 Self {
200 months: i32::decode(encoded[..4].try_into().unwrap()),
201 days: i32::decode(encoded[4..8].try_into().unwrap()),
202 nanoseconds: i64::decode(encoded[8..].try_into().unwrap()),
203 }
204 }
205}
206
207pub const fn encoded_len<T>(_col: &PrimitiveArray<T>) -> usize
209where
210 T: ArrowPrimitiveType,
211 T::Native: FixedLengthEncoding,
212{
213 T::Native::ENCODED_LEN
214}
215
216pub fn encode<T: FixedLengthEncoding>(
221 data: &mut [u8],
222 offsets: &mut [usize],
223 values: &[T],
224 nulls: &NullBuffer,
225 opts: SortOptions,
226) {
227 for (value_idx, is_valid) in nulls.iter().enumerate() {
228 let offset = &mut offsets[value_idx + 1];
229 let end_offset = *offset + T::ENCODED_LEN;
230 if is_valid {
231 let to_write = &mut data[*offset..end_offset];
232 to_write[0] = 1;
233 let mut encoded = values[value_idx].encode();
234 if opts.descending {
235 encoded.as_mut().iter_mut().for_each(|v| *v = !*v)
237 }
238 to_write[1..].copy_from_slice(encoded.as_ref())
239 } else {
240 data[*offset] = null_sentinel(opts);
241 }
242 *offset = end_offset;
243 }
244}
245
246pub fn encode_not_null<T: FixedLengthEncoding>(
249 data: &mut [u8],
250 offsets: &mut [usize],
251 values: &[T],
252 opts: SortOptions,
253) {
254 for (value_idx, val) in values.iter().enumerate() {
255 let offset = &mut offsets[value_idx + 1];
256 let end_offset = *offset + T::ENCODED_LEN;
257
258 let to_write = &mut data[*offset..end_offset];
259 to_write[0] = 1;
260 let mut encoded = val.encode();
261 if opts.descending {
262 encoded.as_mut().iter_mut().for_each(|v| *v = !*v)
264 }
265 to_write[1..].copy_from_slice(encoded.as_ref());
266
267 *offset = end_offset;
268 }
269}
270
271pub fn encode_boolean(
276 data: &mut [u8],
277 offsets: &mut [usize],
278 values: &BooleanBuffer,
279 nulls: &NullBuffer,
280 opts: SortOptions,
281) {
282 for (idx, is_valid) in nulls.iter().enumerate() {
283 let offset = &mut offsets[idx + 1];
284 let end_offset = *offset + bool::ENCODED_LEN;
285 if is_valid {
286 let to_write = &mut data[*offset..end_offset];
287 to_write[0] = 1;
288 let mut encoded = values.value(idx).encode();
289 if opts.descending {
290 encoded.as_mut().iter_mut().for_each(|v| *v = !*v)
292 }
293 to_write[1..].copy_from_slice(encoded.as_ref())
294 } else {
295 data[*offset] = null_sentinel(opts);
296 }
297 *offset = end_offset;
298 }
299}
300
301pub fn encode_boolean_not_null(
304 data: &mut [u8],
305 offsets: &mut [usize],
306 values: &BooleanBuffer,
307 opts: SortOptions,
308) {
309 for (value_idx, val) in values.iter().enumerate() {
310 let offset = &mut offsets[value_idx + 1];
311 let end_offset = *offset + bool::ENCODED_LEN;
312
313 let to_write = &mut data[*offset..end_offset];
314 to_write[0] = 1;
315 let mut encoded = val.encode();
316 if opts.descending {
317 encoded.as_mut().iter_mut().for_each(|v| *v = !*v)
319 }
320 to_write[1..].copy_from_slice(encoded.as_ref());
321
322 *offset = end_offset;
323 }
324}
325
326pub fn encode_fixed_size_binary(
327 data: &mut [u8],
328 offsets: &mut [usize],
329 array: &FixedSizeBinaryArray,
330 opts: SortOptions,
331) {
332 let len = array.value_length() as usize;
333 for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(array.iter()) {
334 let end_offset = *offset + len + 1;
335 if let Some(val) = maybe_val {
336 let to_write = &mut data[*offset..end_offset];
337 to_write[0] = 1;
338 to_write[1..].copy_from_slice(&val[..len]);
339 if opts.descending {
340 to_write[1..1 + len].iter_mut().for_each(|v| *v = !*v)
342 }
343 } else {
344 data[*offset] = null_sentinel(opts);
345 }
346 *offset = end_offset;
347 }
348}
349
350#[inline]
352fn split_off<'a>(src: &mut &'a [u8], len: usize) -> &'a [u8] {
353 let v = &src[..len];
354 *src = &src[len..];
355 v
356}
357
358pub fn decode_bool(rows: &mut [&[u8]], options: SortOptions) -> BooleanArray {
360 let true_val = match options.descending {
361 true => !1,
362 false => 1,
363 };
364
365 let len = rows.len();
366
367 let mut null_count = 0;
368 let mut nulls = MutableBuffer::new(bit_util::ceil(len, 64) * 8);
369 let mut values = MutableBuffer::new(bit_util::ceil(len, 64) * 8);
370
371 let chunks = len / 64;
372 let remainder = len % 64;
373 for chunk in 0..chunks {
374 let mut null_packed = 0;
375 let mut values_packed = 0;
376
377 for bit_idx in 0..64 {
378 let i = split_off(&mut rows[bit_idx + chunk * 64], 2);
379 let (null, value) = (i[0] == 1, i[1] == true_val);
380 null_count += !null as usize;
381 null_packed |= (null as u64) << bit_idx;
382 values_packed |= (value as u64) << bit_idx;
383 }
384
385 nulls.push(null_packed);
386 values.push(values_packed);
387 }
388
389 if remainder != 0 {
390 let mut null_packed = 0;
391 let mut values_packed = 0;
392
393 for bit_idx in 0..remainder {
394 let i = split_off(&mut rows[bit_idx + chunks * 64], 2);
395 let (null, value) = (i[0] == 1, i[1] == true_val);
396 null_count += !null as usize;
397 null_packed |= (null as u64) << bit_idx;
398 values_packed |= (value as u64) << bit_idx;
399 }
400
401 nulls.push(null_packed);
402 values.push(values_packed);
403 }
404
405 let builder = ArrayDataBuilder::new(DataType::Boolean)
406 .len(rows.len())
407 .null_count(null_count)
408 .add_buffer(values.into())
409 .null_bit_buffer(Some(nulls.into()));
410
411 unsafe { BooleanArray::from(builder.build_unchecked()) }
414}
415
416pub fn decode_nulls(rows: &[&[u8]]) -> (usize, Buffer) {
421 let mut null_count = 0;
422 let buffer = MutableBuffer::collect_bool(rows.len(), |idx| {
423 let valid = rows[idx][0] == 1;
424 null_count += !valid as usize;
425 valid
426 })
427 .into();
428 (null_count, buffer)
429}
430
431unsafe fn decode_fixed<T: FixedLengthEncoding + ArrowNativeType>(
437 rows: &mut [&[u8]],
438 data_type: DataType,
439 options: SortOptions,
440) -> ArrayData {
441 let len = rows.len();
442
443 let mut values = BufferBuilder::<T>::new(len);
444 let (null_count, nulls) = decode_nulls(rows);
445
446 for row in rows {
447 let i = split_off(row, T::ENCODED_LEN);
448 let value = T::Encoded::from_slice(&i[1..], options.descending);
449 values.append(T::decode(value));
450 }
451
452 let builder = ArrayDataBuilder::new(data_type)
453 .len(len)
454 .null_count(null_count)
455 .add_buffer(values.finish())
456 .null_bit_buffer(Some(nulls));
457
458 builder.build_unchecked()
460}
461
462pub fn decode_primitive<T: ArrowPrimitiveType>(
464 rows: &mut [&[u8]],
465 data_type: DataType,
466 options: SortOptions,
467) -> PrimitiveArray<T>
468where
469 T::Native: FixedLengthEncoding,
470{
471 assert!(PrimitiveArray::<T>::is_compatible(&data_type));
472 unsafe { decode_fixed::<T::Native>(rows, data_type, options).into() }
475}
476
477pub fn decode_fixed_size_binary(
479 rows: &mut [&[u8]],
480 size: i32,
481 options: SortOptions,
482) -> FixedSizeBinaryArray {
483 let len = rows.len();
484
485 let mut values = MutableBuffer::new(size as usize * rows.len());
486 let (null_count, nulls) = decode_nulls(rows);
487
488 let encoded_len = size as usize + 1;
489
490 for row in rows {
491 let i = split_off(row, encoded_len);
492 values.extend_from_slice(&i[1..]);
493 }
494
495 if options.descending {
496 for v in values.as_slice_mut() {
497 *v = !*v;
498 }
499 }
500
501 let builder = ArrayDataBuilder::new(DataType::FixedSizeBinary(size))
502 .len(len)
503 .null_count(null_count)
504 .add_buffer(values.into())
505 .null_bit_buffer(Some(nulls));
506
507 unsafe { builder.build_unchecked().into() }
509}