1use crate::{variable, RowConverter, Rows, SortField};
19use arrow_array::types::RunEndIndexType;
20use arrow_array::{PrimitiveArray, RunArray};
21use arrow_buffer::{ArrowNativeType, ScalarBuffer};
22use arrow_schema::{ArrowError, SortOptions};
23
24pub fn compute_lengths<R: RunEndIndexType>(
26 lengths: &mut [usize],
27 rows: &Rows,
28 array: &RunArray<R>,
29) {
30 let run_ends = array.run_ends().values();
31 let mut logical_start = 0;
32
33 for (physical_idx, &run_end) in run_ends.iter().enumerate() {
35 let logical_end = run_end.as_usize();
36 let row = rows.row(physical_idx);
37 let encoded_len = variable::encoded_len(Some(row.data));
38
39 for length in &mut lengths[logical_start..logical_end] {
41 *length += encoded_len;
42 }
43
44 logical_start = logical_end;
45 }
46}
47
48pub fn encode<R: RunEndIndexType>(
52 data: &mut [u8],
53 offsets: &mut [usize],
54 rows: &Rows,
55 opts: SortOptions,
56 array: &RunArray<R>,
57) {
58 let run_ends = array.run_ends();
59
60 let mut logical_idx = 0;
61 let mut offset_idx = 1; for physical_idx in 0..run_ends.values().len() {
65 let run_end = run_ends.values()[physical_idx].as_usize();
66
67 while logical_idx < run_end && offset_idx < offsets.len() {
69 let offset = &mut offsets[offset_idx];
70 let out = &mut data[*offset..];
71
72 let row = rows.row(physical_idx);
74 let bytes_written = variable::encode_one(out, Some(row.data), opts);
75 *offset += bytes_written;
76
77 logical_idx += 1;
78 offset_idx += 1;
79 }
80
81 if offset_idx >= offsets.len() {
83 break;
84 }
85 }
86}
87
88pub unsafe fn decode<R: RunEndIndexType>(
94 converter: &RowConverter,
95 rows: &mut [&[u8]],
96 field: &SortField,
97 validate_utf8: bool,
98) -> Result<RunArray<R>, ArrowError> {
99 if rows.is_empty() {
100 let values = converter.convert_raw(&mut [], validate_utf8)?;
101 let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(vec![]), None);
102 return RunArray::<R>::try_new(&run_ends_array, &values[0]);
103 }
104
105 let mut decoded_values = Vec::new();
107 let mut run_ends = Vec::new();
108 let mut unique_row_indices = Vec::new();
109
110 let mut decoded_data = Vec::new();
112 for (idx, row) in rows.iter_mut().enumerate() {
113 decoded_data.clear();
114 let consumed = variable::decode_blocks(row, field.options, |block| {
116 decoded_data.extend_from_slice(block);
117 });
118
119 if field.options.descending {
121 decoded_data.iter_mut().for_each(|b| *b = !*b);
122 }
123
124 *row = &row[consumed..];
126
127 let is_new_run =
129 idx == 0 || decoded_data != decoded_values[*unique_row_indices.last().unwrap()];
130
131 if is_new_run {
132 if idx > 0 {
134 run_ends.push(R::Native::usize_as(idx));
135 }
136 unique_row_indices.push(decoded_values.len());
137 decoded_values.push(decoded_data.clone());
138 }
139 }
140 run_ends.push(R::Native::usize_as(rows.len()));
142
143 let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| v.as_slice()).collect();
145 let values = if unique_rows.is_empty() {
146 converter.convert_raw(&mut [], validate_utf8)?
147 } else {
148 converter.convert_raw(&mut unique_rows, validate_utf8)?
149 };
150
151 let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None);
153
154 RunArray::<R>::try_new(&run_ends_array, &values[0])
156}
157
158#[cfg(test)]
159mod tests {
160 use crate::{RowConverter, SortField};
161 use arrow_array::cast::AsArray;
162 use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
163 use arrow_array::{Array, Int64Array, PrimitiveArray, RunArray, StringArray};
164 use arrow_schema::{DataType, SortOptions};
165 use std::sync::Arc;
166
167 fn assert_roundtrip<R: RunEndIndexType>(
168 array: &RunArray<R>,
169 run_end_type: DataType,
170 values_type: DataType,
171 sort_options: Option<SortOptions>,
172 ) {
173 let sort_field = if let Some(options) = sort_options {
174 SortField::new_with_options(
175 DataType::RunEndEncoded(
176 Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
177 Arc::new(arrow_schema::Field::new("values", values_type, true)),
178 ),
179 options,
180 )
181 } else {
182 SortField::new(DataType::RunEndEncoded(
183 Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
184 Arc::new(arrow_schema::Field::new("values", values_type, true)),
185 ))
186 };
187
188 let converter = RowConverter::new(vec![sort_field]).unwrap();
189
190 let rows = converter
191 .convert_columns(&[Arc::new(array.clone())])
192 .unwrap();
193
194 let arrays = converter.convert_rows(&rows).unwrap();
195 let result = arrays[0].as_any().downcast_ref::<RunArray<R>>().unwrap();
196
197 assert_eq!(array, result);
198 }
199
200 #[test]
201 fn test_run_end_encoded_supports_datatype() {
202 assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded(
204 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
205 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
206 )));
207 }
208
209 #[test]
210 fn test_run_end_encoded_round_trip_int16_int64s() {
211 let values = Int64Array::from(vec![100, 200, 100, 300]);
215 let run_ends = vec![2, 3, 5, 6];
216 let array: RunArray<Int16Type> =
217 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
218
219 assert_roundtrip(&array, DataType::Int16, DataType::Int64, None);
220 }
221
222 #[test]
223 fn test_run_end_encoded_round_trip_int32_int64s() {
224 let values = Int64Array::from(vec![100, 200, 100, 300]);
228 let run_ends = vec![2, 3, 5, 6];
229 let array: RunArray<Int32Type> =
230 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
231
232 assert_roundtrip(&array, DataType::Int32, DataType::Int64, None);
233 }
234
235 #[test]
236 fn test_run_end_encoded_round_trip_int64_int64s() {
237 let values = Int64Array::from(vec![100, 200, 100, 300]);
241 let run_ends = vec![2, 3, 5, 6];
242 let array: RunArray<Int64Type> =
243 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
244
245 assert_roundtrip(&array, DataType::Int64, DataType::Int64, None);
246 }
247
248 #[test]
249 fn test_run_end_encoded_round_trip_strings() {
250 let array: RunArray<Int32Type> = vec!["b", "b", "a"].into_iter().collect();
253
254 assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
255 }
256
257 #[test]
258 fn test_run_end_encoded_round_trip_strings_with_nulls() {
259 let array: RunArray<Int32Type> = vec![Some("b"), Some("b"), None, Some("a")]
262 .into_iter()
263 .collect();
264
265 assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
266 }
267
268 #[test]
269 fn test_run_end_encoded_ascending_descending_round_trip() {
270 let values_asc =
273 arrow_array::StringArray::from(vec![Some("apple"), Some("banana"), Some("cherry")]);
274 let run_ends_asc = vec![2, 4, 6];
275 let run_array_asc: RunArray<Int32Type> = RunArray::try_new(
276 &arrow_array::PrimitiveArray::from(run_ends_asc),
277 &values_asc,
278 )
279 .unwrap();
280
281 assert_roundtrip(
283 &run_array_asc,
284 DataType::Int32,
285 DataType::Utf8,
286 Some(SortOptions {
287 descending: false,
288 nulls_first: true,
289 }),
290 );
291
292 assert_roundtrip(
294 &run_array_asc,
295 DataType::Int32,
296 DataType::Utf8,
297 Some(SortOptions {
298 descending: true,
299 nulls_first: true,
300 }),
301 );
302 }
303
304 #[test]
305 fn test_run_end_encoded_sort_configurations_basic() {
306 let test_array: RunArray<Int32Type> = vec!["test"].into_iter().collect();
309
310 assert_roundtrip(
312 &test_array,
313 DataType::Int32,
314 DataType::Utf8,
315 Some(SortOptions {
316 descending: false,
317 nulls_first: true,
318 }),
319 );
320
321 assert_roundtrip(
323 &test_array,
324 DataType::Int32,
325 DataType::Utf8,
326 Some(SortOptions {
327 descending: true,
328 nulls_first: true,
329 }),
330 );
331 }
332
333 #[test]
334 fn test_run_end_encoded_nulls_first_last_configurations() {
335 let simple_array: RunArray<Int32Type> = vec!["simple"].into_iter().collect();
338
339 let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
340 DataType::RunEndEncoded(
341 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
342 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
343 ),
344 SortOptions {
345 descending: false,
346 nulls_first: true,
347 },
348 )])
349 .unwrap();
350
351 let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
352 DataType::RunEndEncoded(
353 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
354 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
355 ),
356 SortOptions {
357 descending: false,
358 nulls_first: false,
359 },
360 )])
361 .unwrap();
362
363 let rows_nulls_first = converter_nulls_first
365 .convert_columns(&[Arc::new(simple_array.clone())])
366 .unwrap();
367 let arrays_nulls_first = converter_nulls_first
368 .convert_rows(&rows_nulls_first)
369 .unwrap();
370 let result_nulls_first = arrays_nulls_first[0]
371 .as_any()
372 .downcast_ref::<RunArray<Int32Type>>()
373 .unwrap();
374
375 let rows_nulls_last = converter_nulls_last
376 .convert_columns(&[Arc::new(simple_array.clone())])
377 .unwrap();
378 let arrays_nulls_last = converter_nulls_last.convert_rows(&rows_nulls_last).unwrap();
379 let result_nulls_last = arrays_nulls_last[0]
380 .as_any()
381 .downcast_ref::<RunArray<Int32Type>>()
382 .unwrap();
383
384 assert_eq!(simple_array.len(), result_nulls_first.len());
386 assert_eq!(simple_array.len(), result_nulls_last.len());
387 }
388
389 #[test]
390 fn test_run_end_encoded_row_consumption() {
391 let array: RunArray<Int32Type> = vec!["a", "a", "b", "b", "b", "c"].into_iter().collect();
397 let string_array = StringArray::from(vec!["x", "y", "z", "w", "u", "v"]);
398
399 let multi_converter = RowConverter::new(vec![
400 SortField::new(DataType::RunEndEncoded(
401 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
402 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
403 )),
404 SortField::new(DataType::Utf8),
405 ])
406 .unwrap();
407
408 let multi_rows = multi_converter
409 .convert_columns(&[Arc::new(array.clone()), Arc::new(string_array.clone())])
410 .unwrap();
411
412 let arrays = multi_converter.convert_rows(&multi_rows).unwrap();
414
415 let result_ree = arrays[0]
417 .as_any()
418 .downcast_ref::<RunArray<Int32Type>>()
419 .unwrap();
420
421 let result_string = arrays[1].as_any().downcast_ref::<StringArray>().unwrap();
422
423 assert_eq!(result_ree.values().as_ref(), array.values().as_ref());
425 assert_eq!(result_ree.run_ends().values(), array.run_ends().values());
426 assert_eq!(*result_string, string_array);
427 }
428
429 #[test]
430 fn test_run_end_encoded_sorting_behavior() {
431 let array1: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
435 let array2: RunArray<Int32Type> = vec!["banana", "banana"].into_iter().collect();
436 let array3: RunArray<Int32Type> = vec!["cherry", "cherry"].into_iter().collect();
437
438 let converter_asc = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
440 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
441 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
442 ))])
443 .unwrap();
444
445 let rows1_asc = converter_asc
446 .convert_columns(&[Arc::new(array1.clone())])
447 .unwrap();
448 let rows2_asc = converter_asc
449 .convert_columns(&[Arc::new(array2.clone())])
450 .unwrap();
451 let rows3_asc = converter_asc
452 .convert_columns(&[Arc::new(array3.clone())])
453 .unwrap();
454
455 assert!(
458 rows1_asc.row(0) < rows2_asc.row(0),
459 "apple should come before banana in ascending order"
460 );
461 assert!(
462 rows2_asc.row(0) < rows3_asc.row(0),
463 "banana should come before cherry in ascending order"
464 );
465 assert!(
466 rows1_asc.row(0) < rows3_asc.row(0),
467 "apple should come before cherry in ascending order"
468 );
469
470 let converter_desc = RowConverter::new(vec![SortField::new_with_options(
472 DataType::RunEndEncoded(
473 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
474 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
475 ),
476 arrow_schema::SortOptions {
477 descending: true,
478 nulls_first: true,
479 },
480 )])
481 .unwrap();
482
483 let rows1_desc = converter_desc
484 .convert_columns(&[Arc::new(array1.clone())])
485 .unwrap();
486 let rows2_desc = converter_desc
487 .convert_columns(&[Arc::new(array2.clone())])
488 .unwrap();
489 let rows3_desc = converter_desc
490 .convert_columns(&[Arc::new(array3.clone())])
491 .unwrap();
492
493 assert!(
496 rows3_desc.row(0) < rows2_desc.row(0),
497 "cherry should come before banana in descending order (byte-wise)"
498 );
499 assert!(
500 rows2_desc.row(0) < rows1_desc.row(0),
501 "banana should come before apple in descending order (byte-wise)"
502 );
503 assert!(
504 rows3_desc.row(0) < rows1_desc.row(0),
505 "cherry should come before apple in descending order (byte-wise)"
506 );
507 }
508
509 #[test]
510 fn test_run_end_encoded_null_sorting() {
511 let array_with_nulls: RunArray<Int32Type> = vec![None, None].into_iter().collect();
514 let array_with_values: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
515
516 let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
518 DataType::RunEndEncoded(
519 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
520 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
521 ),
522 arrow_schema::SortOptions {
523 descending: false,
524 nulls_first: true,
525 },
526 )])
527 .unwrap();
528
529 let rows_nulls = converter_nulls_first
530 .convert_columns(&[Arc::new(array_with_nulls.clone())])
531 .unwrap();
532 let rows_values = converter_nulls_first
533 .convert_columns(&[Arc::new(array_with_values.clone())])
534 .unwrap();
535
536 assert!(
538 rows_nulls.row(0) < rows_values.row(0),
539 "nulls should come before values when nulls_first=true"
540 );
541
542 let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
544 DataType::RunEndEncoded(
545 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
546 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
547 ),
548 arrow_schema::SortOptions {
549 descending: false,
550 nulls_first: false,
551 },
552 )])
553 .unwrap();
554
555 let rows_nulls_last = converter_nulls_last
556 .convert_columns(&[Arc::new(array_with_nulls.clone())])
557 .unwrap();
558 let rows_values_last = converter_nulls_last
559 .convert_columns(&[Arc::new(array_with_values.clone())])
560 .unwrap();
561
562 assert!(
564 rows_values_last.row(0) < rows_nulls_last.row(0),
565 "values should come before nulls when nulls_first=false"
566 );
567 }
568
569 #[test]
570 fn test_run_end_encoded_mixed_sorting() {
571 let array1: RunArray<Int32Type> = vec![Some("apple"), None].into_iter().collect();
574 let array2: RunArray<Int32Type> = vec![None, Some("banana")].into_iter().collect();
575 let array3: RunArray<Int32Type> =
576 vec![Some("cherry"), Some("cherry")].into_iter().collect();
577
578 let converter = RowConverter::new(vec![SortField::new_with_options(
579 DataType::RunEndEncoded(
580 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
581 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
582 ),
583 arrow_schema::SortOptions {
584 descending: false,
585 nulls_first: true,
586 },
587 )])
588 .unwrap();
589
590 let rows1 = converter.convert_columns(&[Arc::new(array1)]).unwrap();
591 let rows2 = converter.convert_columns(&[Arc::new(array2)]).unwrap();
592 let rows3 = converter.convert_columns(&[Arc::new(array3)]).unwrap();
593
594 assert!(rows2.row(0) < rows1.row(0), "null should come before apple");
600 assert!(
601 rows1.row(0) < rows3.row(0),
602 "apple should come before cherry"
603 );
604
605 assert!(
607 rows1.row(1) < rows2.row(1),
608 "null should come before banana"
609 );
610 assert!(
611 rows2.row(1) < rows3.row(1),
612 "banana should come before cherry"
613 );
614 }
615
616 #[test]
617 fn test_run_end_encoded_empty() {
618 let values: Vec<&str> = vec![];
620 let array: RunArray<Int32Type> = values.into_iter().collect();
621
622 let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
623 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
624 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
625 ))])
626 .unwrap();
627
628 let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
629 assert_eq!(rows.num_rows(), 0);
630
631 let arrays = converter.convert_rows(&rows).unwrap();
633 assert_eq!(arrays.len(), 1);
634 let result_ree = arrays[0].as_run::<Int32Type>();
636 assert_eq!(result_ree.len(), 0);
637 }
638}