1use crate::{RowConverter, Rows, SortField, variable};
19use arrow_array::types::RunEndIndexType;
20use arrow_array::{PrimitiveArray, RunArray};
21use arrow_buffer::{ArrowNativeType, ScalarBuffer};
22use arrow_schema::{ArrowError, SortOptions};
23
24pub fn compute_lengths<R: RunEndIndexType>(
26 lengths: &mut [usize],
27 rows: &Rows,
28 array: &RunArray<R>,
29) {
30 let run_ends = array.run_ends().sliced_values();
31 let mut logical_start = 0;
32
33 for (physical_idx, run_end) in run_ends.enumerate() {
35 let logical_end = run_end.as_usize();
36 let row_len = rows.row_len(physical_idx);
37 let encoded_len = variable::padded_length(Some(row_len));
38
39 for length in &mut lengths[logical_start..logical_end] {
41 *length += encoded_len;
42 }
43
44 logical_start = logical_end;
45 }
46}
47
48pub fn encode<R: RunEndIndexType>(
52 data: &mut [u8],
53 offsets: &mut [usize],
54 rows: &Rows,
55 opts: SortOptions,
56 array: &RunArray<R>,
57) {
58 let run_ends = array.run_ends().sliced_values();
59 let mut logical_idx = 0;
60 let mut offset_idx = 1; for (physical_idx, run_end) in run_ends.enumerate() {
64 let run_end = run_end.as_usize();
65 let iteration_count = run_end - logical_idx;
66
67 let first_offset = offsets[offset_idx];
68 let out = &mut data[first_offset..];
69 let bytes_written = variable::encode_one(out, Some(rows.row(physical_idx).data), opts);
70 offsets[offset_idx] += bytes_written;
71 for i in 1..iteration_count {
73 let dst = offsets[offset_idx + i];
74 data.copy_within(first_offset..first_offset + bytes_written, dst);
75 offsets[offset_idx + i] += bytes_written;
76 }
77 logical_idx = run_end;
78 offset_idx += iteration_count;
79 }
80}
81
82pub unsafe fn decode<R: RunEndIndexType>(
88 converter: &RowConverter,
89 rows: &mut [&[u8]],
90 field: &SortField,
91 validate_utf8: bool,
92) -> Result<RunArray<R>, ArrowError> {
93 if rows.is_empty() {
94 let values = unsafe { converter.convert_raw(&mut [], validate_utf8) }?;
95 let run_ends_array = PrimitiveArray::<R>::try_new(ScalarBuffer::from(vec![]), None)?;
96 return RunArray::<R>::try_new(&run_ends_array, &values[0]);
97 }
98
99 let mut decoded_values = Vec::new();
101 let mut run_ends = Vec::new();
102 let mut unique_row_indices = Vec::new();
103
104 let mut decoded_data = Vec::new();
106 for (idx, row) in rows.iter_mut().enumerate() {
107 decoded_data.clear();
108 let consumed = variable::decode_blocks(row, field.options, |block| {
110 decoded_data.extend_from_slice(block);
111 });
112
113 if field.options.descending {
115 decoded_data.iter_mut().for_each(|b| *b = !*b);
116 }
117
118 *row = &row[consumed..];
120
121 let is_new_run =
123 idx == 0 || decoded_data != decoded_values[*unique_row_indices.last().unwrap()];
124
125 if is_new_run {
126 if idx > 0 {
128 run_ends.push(R::Native::usize_as(idx));
129 }
130 unique_row_indices.push(decoded_values.len());
131 let capacity = decoded_data.capacity();
132 decoded_values.push(std::mem::replace(
133 &mut decoded_data,
134 Vec::with_capacity(capacity),
135 ));
136 }
137 }
138 run_ends.push(R::Native::usize_as(rows.len()));
140
141 let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| v.as_slice()).collect();
143 let values = if unique_rows.is_empty() {
144 unsafe { converter.convert_raw(&mut [], validate_utf8) }?
145 } else {
146 unsafe { converter.convert_raw(&mut unique_rows, validate_utf8) }?
147 };
148
149 let run_ends_array = PrimitiveArray::<R>::try_new(ScalarBuffer::from(run_ends), None)?;
151
152 RunArray::<R>::try_new(&run_ends_array, &values[0])
154}
155
156#[cfg(test)]
157mod tests {
158 use crate::{RowConverter, SortField};
159 use arrow_array::cast::AsArray;
160 use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
161 use arrow_array::{Array, Int64Array, PrimitiveArray, RunArray, StringArray};
162 use arrow_schema::{DataType, SortOptions};
163 use std::sync::Arc;
164
165 fn assert_roundtrip<R: RunEndIndexType>(
166 array: &RunArray<R>,
167 run_end_type: DataType,
168 values_type: DataType,
169 sort_options: Option<SortOptions>,
170 ) {
171 let sort_field = if let Some(options) = sort_options {
172 SortField::new_with_options(
173 DataType::RunEndEncoded(
174 Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
175 Arc::new(arrow_schema::Field::new("values", values_type, true)),
176 ),
177 options,
178 )
179 } else {
180 SortField::new(DataType::RunEndEncoded(
181 Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
182 Arc::new(arrow_schema::Field::new("values", values_type, true)),
183 ))
184 };
185
186 let converter = RowConverter::new(vec![sort_field]).unwrap();
187
188 let rows = converter
189 .convert_columns(&[Arc::new(array.clone())])
190 .unwrap();
191
192 let arrays = converter.convert_rows(&rows).unwrap();
193 let result = arrays[0].as_any().downcast_ref::<RunArray<R>>().unwrap();
194
195 assert_eq!(array, result);
196 }
197
198 #[test]
199 fn test_run_end_encoded_supports_datatype() {
200 assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded(
202 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
203 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
204 )));
205 }
206
207 #[test]
208 fn test_run_end_encoded_round_trip_int16_int64s() {
209 let values = Int64Array::from(vec![100, 200, 100, 300]);
213 let run_ends = vec![2, 3, 5, 6];
214 let array: RunArray<Int16Type> =
215 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
216
217 assert_roundtrip(&array, DataType::Int16, DataType::Int64, None);
218 }
219
220 #[test]
221 fn test_run_end_encoded_round_trip_int32_int64s() {
222 let values = Int64Array::from(vec![100, 200, 100, 300]);
226 let run_ends = vec![2, 3, 5, 6];
227 let array: RunArray<Int32Type> =
228 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
229
230 assert_roundtrip(&array, DataType::Int32, DataType::Int64, None);
231 }
232
233 #[test]
234 fn test_run_end_encoded_round_trip_int64_int64s() {
235 let values = Int64Array::from(vec![100, 200, 100, 300]);
239 let run_ends = vec![2, 3, 5, 6];
240 let array: RunArray<Int64Type> =
241 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
242
243 assert_roundtrip(&array, DataType::Int64, DataType::Int64, None);
244 }
245
246 #[test]
247 fn test_run_end_encoded_round_trip_strings() {
248 let array: RunArray<Int32Type> = vec!["b", "b", "a"].into_iter().collect();
251
252 assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
253 }
254
255 #[test]
256 fn test_run_end_encoded_round_trip_strings_with_nulls() {
257 let array: RunArray<Int32Type> = vec![Some("b"), Some("b"), None, Some("a")]
260 .into_iter()
261 .collect();
262
263 assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
264 }
265
266 #[test]
267 fn test_run_end_encoded_ascending_descending_round_trip() {
268 let values_asc =
271 arrow_array::StringArray::from(vec![Some("apple"), Some("banana"), Some("cherry")]);
272 let run_ends_asc = vec![2, 4, 6];
273 let run_array_asc: RunArray<Int32Type> = RunArray::try_new(
274 &arrow_array::PrimitiveArray::from(run_ends_asc),
275 &values_asc,
276 )
277 .unwrap();
278
279 assert_roundtrip(
281 &run_array_asc,
282 DataType::Int32,
283 DataType::Utf8,
284 Some(SortOptions {
285 descending: false,
286 nulls_first: true,
287 }),
288 );
289
290 assert_roundtrip(
292 &run_array_asc,
293 DataType::Int32,
294 DataType::Utf8,
295 Some(SortOptions {
296 descending: true,
297 nulls_first: true,
298 }),
299 );
300 }
301
302 #[test]
303 fn test_run_end_encoded_sort_configurations_basic() {
304 let test_array: RunArray<Int32Type> = vec!["test"].into_iter().collect();
307
308 assert_roundtrip(
310 &test_array,
311 DataType::Int32,
312 DataType::Utf8,
313 Some(SortOptions {
314 descending: false,
315 nulls_first: true,
316 }),
317 );
318
319 assert_roundtrip(
321 &test_array,
322 DataType::Int32,
323 DataType::Utf8,
324 Some(SortOptions {
325 descending: true,
326 nulls_first: true,
327 }),
328 );
329 }
330
331 #[test]
332 fn test_run_end_encoded_nulls_first_last_configurations() {
333 let simple_array: RunArray<Int32Type> = vec!["simple"].into_iter().collect();
336
337 let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
338 DataType::RunEndEncoded(
339 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
340 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
341 ),
342 SortOptions {
343 descending: false,
344 nulls_first: true,
345 },
346 )])
347 .unwrap();
348
349 let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
350 DataType::RunEndEncoded(
351 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
352 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
353 ),
354 SortOptions {
355 descending: false,
356 nulls_first: false,
357 },
358 )])
359 .unwrap();
360
361 let rows_nulls_first = converter_nulls_first
363 .convert_columns(&[Arc::new(simple_array.clone())])
364 .unwrap();
365 let arrays_nulls_first = converter_nulls_first
366 .convert_rows(&rows_nulls_first)
367 .unwrap();
368 let result_nulls_first = arrays_nulls_first[0]
369 .as_any()
370 .downcast_ref::<RunArray<Int32Type>>()
371 .unwrap();
372
373 let rows_nulls_last = converter_nulls_last
374 .convert_columns(&[Arc::new(simple_array.clone())])
375 .unwrap();
376 let arrays_nulls_last = converter_nulls_last.convert_rows(&rows_nulls_last).unwrap();
377 let result_nulls_last = arrays_nulls_last[0]
378 .as_any()
379 .downcast_ref::<RunArray<Int32Type>>()
380 .unwrap();
381
382 assert_eq!(simple_array.len(), result_nulls_first.len());
384 assert_eq!(simple_array.len(), result_nulls_last.len());
385 }
386
387 #[test]
388 fn test_run_end_encoded_row_consumption() {
389 let array: RunArray<Int32Type> = vec!["a", "a", "b", "b", "b", "c"].into_iter().collect();
395 let string_array = StringArray::from(vec!["x", "y", "z", "w", "u", "v"]);
396
397 let multi_converter = RowConverter::new(vec![
398 SortField::new(DataType::RunEndEncoded(
399 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
400 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
401 )),
402 SortField::new(DataType::Utf8),
403 ])
404 .unwrap();
405
406 let multi_rows = multi_converter
407 .convert_columns(&[Arc::new(array.clone()), Arc::new(string_array.clone())])
408 .unwrap();
409
410 let arrays = multi_converter.convert_rows(&multi_rows).unwrap();
412
413 let result_ree = arrays[0]
415 .as_any()
416 .downcast_ref::<RunArray<Int32Type>>()
417 .unwrap();
418
419 let result_string = arrays[1].as_any().downcast_ref::<StringArray>().unwrap();
420
421 assert_eq!(result_ree.values().as_ref(), array.values().as_ref());
423 assert_eq!(result_ree.run_ends().values(), array.run_ends().values());
424 assert_eq!(*result_string, string_array);
425 }
426
427 #[test]
428 fn test_run_end_encoded_sorting_behavior() {
429 let array1: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
433 let array2: RunArray<Int32Type> = vec!["banana", "banana"].into_iter().collect();
434 let array3: RunArray<Int32Type> = vec!["cherry", "cherry"].into_iter().collect();
435
436 let converter_asc = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
438 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
439 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
440 ))])
441 .unwrap();
442
443 let rows1_asc = converter_asc
444 .convert_columns(&[Arc::new(array1.clone())])
445 .unwrap();
446 let rows2_asc = converter_asc
447 .convert_columns(&[Arc::new(array2.clone())])
448 .unwrap();
449 let rows3_asc = converter_asc
450 .convert_columns(&[Arc::new(array3.clone())])
451 .unwrap();
452
453 assert!(
456 rows1_asc.row(0) < rows2_asc.row(0),
457 "apple should come before banana in ascending order"
458 );
459 assert!(
460 rows2_asc.row(0) < rows3_asc.row(0),
461 "banana should come before cherry in ascending order"
462 );
463 assert!(
464 rows1_asc.row(0) < rows3_asc.row(0),
465 "apple should come before cherry in ascending order"
466 );
467
468 let converter_desc = RowConverter::new(vec![SortField::new_with_options(
470 DataType::RunEndEncoded(
471 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
472 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
473 ),
474 arrow_schema::SortOptions {
475 descending: true,
476 nulls_first: true,
477 },
478 )])
479 .unwrap();
480
481 let rows1_desc = converter_desc
482 .convert_columns(&[Arc::new(array1.clone())])
483 .unwrap();
484 let rows2_desc = converter_desc
485 .convert_columns(&[Arc::new(array2.clone())])
486 .unwrap();
487 let rows3_desc = converter_desc
488 .convert_columns(&[Arc::new(array3.clone())])
489 .unwrap();
490
491 assert!(
494 rows3_desc.row(0) < rows2_desc.row(0),
495 "cherry should come before banana in descending order (byte-wise)"
496 );
497 assert!(
498 rows2_desc.row(0) < rows1_desc.row(0),
499 "banana should come before apple in descending order (byte-wise)"
500 );
501 assert!(
502 rows3_desc.row(0) < rows1_desc.row(0),
503 "cherry should come before apple in descending order (byte-wise)"
504 );
505 }
506
507 #[test]
508 fn test_run_end_encoded_null_sorting() {
509 let array_with_nulls: RunArray<Int32Type> = vec![None, None].into_iter().collect();
512 let array_with_values: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
513
514 let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
516 DataType::RunEndEncoded(
517 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
518 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
519 ),
520 arrow_schema::SortOptions {
521 descending: false,
522 nulls_first: true,
523 },
524 )])
525 .unwrap();
526
527 let rows_nulls = converter_nulls_first
528 .convert_columns(&[Arc::new(array_with_nulls.clone())])
529 .unwrap();
530 let rows_values = converter_nulls_first
531 .convert_columns(&[Arc::new(array_with_values.clone())])
532 .unwrap();
533
534 assert!(
536 rows_nulls.row(0) < rows_values.row(0),
537 "nulls should come before values when nulls_first=true"
538 );
539
540 let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
542 DataType::RunEndEncoded(
543 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
544 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
545 ),
546 arrow_schema::SortOptions {
547 descending: false,
548 nulls_first: false,
549 },
550 )])
551 .unwrap();
552
553 let rows_nulls_last = converter_nulls_last
554 .convert_columns(&[Arc::new(array_with_nulls.clone())])
555 .unwrap();
556 let rows_values_last = converter_nulls_last
557 .convert_columns(&[Arc::new(array_with_values.clone())])
558 .unwrap();
559
560 assert!(
562 rows_values_last.row(0) < rows_nulls_last.row(0),
563 "values should come before nulls when nulls_first=false"
564 );
565 }
566
567 #[test]
568 fn test_run_end_encoded_mixed_sorting() {
569 let array1: RunArray<Int32Type> = vec![Some("apple"), None].into_iter().collect();
572 let array2: RunArray<Int32Type> = vec![None, Some("banana")].into_iter().collect();
573 let array3: RunArray<Int32Type> =
574 vec![Some("cherry"), Some("cherry")].into_iter().collect();
575
576 let converter = RowConverter::new(vec![SortField::new_with_options(
577 DataType::RunEndEncoded(
578 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
579 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
580 ),
581 arrow_schema::SortOptions {
582 descending: false,
583 nulls_first: true,
584 },
585 )])
586 .unwrap();
587
588 let rows1 = converter.convert_columns(&[Arc::new(array1)]).unwrap();
589 let rows2 = converter.convert_columns(&[Arc::new(array2)]).unwrap();
590 let rows3 = converter.convert_columns(&[Arc::new(array3)]).unwrap();
591
592 assert!(rows2.row(0) < rows1.row(0), "null should come before apple");
598 assert!(
599 rows1.row(0) < rows3.row(0),
600 "apple should come before cherry"
601 );
602
603 assert!(
605 rows1.row(1) < rows2.row(1),
606 "null should come before banana"
607 );
608 assert!(
609 rows2.row(1) < rows3.row(1),
610 "banana should come before cherry"
611 );
612 }
613
614 #[test]
615 fn test_run_end_encoded_empty() {
616 let values: Vec<&str> = vec![];
618 let array: RunArray<Int32Type> = values.into_iter().collect();
619
620 let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
621 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
622 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
623 ))])
624 .unwrap();
625
626 let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
627 assert_eq!(rows.num_rows(), 0);
628
629 let arrays = converter.convert_rows(&rows).unwrap();
631 assert_eq!(arrays.len(), 1);
632 let result_ree = arrays[0].as_run::<Int32Type>();
634 assert_eq!(result_ree.len(), 0);
635 }
636
637 #[test]
638 fn test_run_end_encoded_round_trip_sliced() {
639 let values = Int64Array::from(vec![100, 200, 100, 300]);
640 let run_ends = vec![2, 3, 5, 6];
641 let array: RunArray<Int16Type> =
642 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
643 let array = array.slice(2, 3);
644
645 assert_roundtrip(&array, DataType::Int16, DataType::Int64, None);
646 }
647}