1use crate::{RowConverter, Rows, SortField, variable};
19use arrow_array::types::RunEndIndexType;
20use arrow_array::{PrimitiveArray, RunArray};
21use arrow_buffer::{ArrowNativeType, ScalarBuffer};
22use arrow_schema::{ArrowError, SortOptions};
23
24pub fn compute_lengths<R: RunEndIndexType>(
26 lengths: &mut [usize],
27 rows: &Rows,
28 array: &RunArray<R>,
29) {
30 let run_ends = array.run_ends().values();
31 let mut logical_start = 0;
32
33 for (physical_idx, &run_end) in run_ends.iter().enumerate() {
35 let logical_end = run_end.as_usize();
36 let row_len = rows.row_len(physical_idx);
37 let encoded_len = variable::padded_length(Some(row_len));
38
39 for length in &mut lengths[logical_start..logical_end] {
41 *length += encoded_len;
42 }
43
44 logical_start = logical_end;
45 }
46}
47
48pub fn encode<R: RunEndIndexType>(
52 data: &mut [u8],
53 offsets: &mut [usize],
54 rows: &Rows,
55 opts: SortOptions,
56 array: &RunArray<R>,
57) {
58 let run_ends = array.run_ends();
59
60 let mut logical_idx = 0;
61 let mut offset_idx = 1; for physical_idx in 0..run_ends.values().len() {
65 let run_end = run_ends.values()[physical_idx].as_usize();
66
67 while logical_idx < run_end && offset_idx < offsets.len() {
69 let offset = &mut offsets[offset_idx];
70 let out = &mut data[*offset..];
71
72 let row = rows.row(physical_idx);
74 let bytes_written = variable::encode_one(out, Some(row.data), opts);
75 *offset += bytes_written;
76
77 logical_idx += 1;
78 offset_idx += 1;
79 }
80
81 if offset_idx >= offsets.len() {
83 break;
84 }
85 }
86}
87
88pub unsafe fn decode<R: RunEndIndexType>(
94 converter: &RowConverter,
95 rows: &mut [&[u8]],
96 field: &SortField,
97 validate_utf8: bool,
98) -> Result<RunArray<R>, ArrowError> {
99 if rows.is_empty() {
100 let values = unsafe { converter.convert_raw(&mut [], validate_utf8) }?;
101 let run_ends_array = PrimitiveArray::<R>::try_new(ScalarBuffer::from(vec![]), None)?;
102 return RunArray::<R>::try_new(&run_ends_array, &values[0]);
103 }
104
105 let mut decoded_values = Vec::new();
107 let mut run_ends = Vec::new();
108 let mut unique_row_indices = Vec::new();
109
110 let mut decoded_data = Vec::new();
112 for (idx, row) in rows.iter_mut().enumerate() {
113 decoded_data.clear();
114 let consumed = variable::decode_blocks(row, field.options, |block| {
116 decoded_data.extend_from_slice(block);
117 });
118
119 if field.options.descending {
121 decoded_data.iter_mut().for_each(|b| *b = !*b);
122 }
123
124 *row = &row[consumed..];
126
127 let is_new_run =
129 idx == 0 || decoded_data != decoded_values[*unique_row_indices.last().unwrap()];
130
131 if is_new_run {
132 if idx > 0 {
134 run_ends.push(R::Native::usize_as(idx));
135 }
136 unique_row_indices.push(decoded_values.len());
137 let capacity = decoded_data.capacity();
138 decoded_values.push(std::mem::replace(
139 &mut decoded_data,
140 Vec::with_capacity(capacity),
141 ));
142 }
143 }
144 run_ends.push(R::Native::usize_as(rows.len()));
146
147 let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| v.as_slice()).collect();
149 let values = if unique_rows.is_empty() {
150 unsafe { converter.convert_raw(&mut [], validate_utf8) }?
151 } else {
152 unsafe { converter.convert_raw(&mut unique_rows, validate_utf8) }?
153 };
154
155 let run_ends_array = PrimitiveArray::<R>::try_new(ScalarBuffer::from(run_ends), None)?;
157
158 RunArray::<R>::try_new(&run_ends_array, &values[0])
160}
161
162#[cfg(test)]
163mod tests {
164 use crate::{RowConverter, SortField};
165 use arrow_array::cast::AsArray;
166 use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
167 use arrow_array::{Array, Int64Array, PrimitiveArray, RunArray, StringArray};
168 use arrow_schema::{DataType, SortOptions};
169 use std::sync::Arc;
170
171 fn assert_roundtrip<R: RunEndIndexType>(
172 array: &RunArray<R>,
173 run_end_type: DataType,
174 values_type: DataType,
175 sort_options: Option<SortOptions>,
176 ) {
177 let sort_field = if let Some(options) = sort_options {
178 SortField::new_with_options(
179 DataType::RunEndEncoded(
180 Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
181 Arc::new(arrow_schema::Field::new("values", values_type, true)),
182 ),
183 options,
184 )
185 } else {
186 SortField::new(DataType::RunEndEncoded(
187 Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
188 Arc::new(arrow_schema::Field::new("values", values_type, true)),
189 ))
190 };
191
192 let converter = RowConverter::new(vec![sort_field]).unwrap();
193
194 let rows = converter
195 .convert_columns(&[Arc::new(array.clone())])
196 .unwrap();
197
198 let arrays = converter.convert_rows(&rows).unwrap();
199 let result = arrays[0].as_any().downcast_ref::<RunArray<R>>().unwrap();
200
201 assert_eq!(array, result);
202 }
203
204 #[test]
205 fn test_run_end_encoded_supports_datatype() {
206 assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded(
208 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
209 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
210 )));
211 }
212
213 #[test]
214 fn test_run_end_encoded_round_trip_int16_int64s() {
215 let values = Int64Array::from(vec![100, 200, 100, 300]);
219 let run_ends = vec![2, 3, 5, 6];
220 let array: RunArray<Int16Type> =
221 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
222
223 assert_roundtrip(&array, DataType::Int16, DataType::Int64, None);
224 }
225
226 #[test]
227 fn test_run_end_encoded_round_trip_int32_int64s() {
228 let values = Int64Array::from(vec![100, 200, 100, 300]);
232 let run_ends = vec![2, 3, 5, 6];
233 let array: RunArray<Int32Type> =
234 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
235
236 assert_roundtrip(&array, DataType::Int32, DataType::Int64, None);
237 }
238
239 #[test]
240 fn test_run_end_encoded_round_trip_int64_int64s() {
241 let values = Int64Array::from(vec![100, 200, 100, 300]);
245 let run_ends = vec![2, 3, 5, 6];
246 let array: RunArray<Int64Type> =
247 RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
248
249 assert_roundtrip(&array, DataType::Int64, DataType::Int64, None);
250 }
251
252 #[test]
253 fn test_run_end_encoded_round_trip_strings() {
254 let array: RunArray<Int32Type> = vec!["b", "b", "a"].into_iter().collect();
257
258 assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
259 }
260
261 #[test]
262 fn test_run_end_encoded_round_trip_strings_with_nulls() {
263 let array: RunArray<Int32Type> = vec![Some("b"), Some("b"), None, Some("a")]
266 .into_iter()
267 .collect();
268
269 assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
270 }
271
272 #[test]
273 fn test_run_end_encoded_ascending_descending_round_trip() {
274 let values_asc =
277 arrow_array::StringArray::from(vec![Some("apple"), Some("banana"), Some("cherry")]);
278 let run_ends_asc = vec![2, 4, 6];
279 let run_array_asc: RunArray<Int32Type> = RunArray::try_new(
280 &arrow_array::PrimitiveArray::from(run_ends_asc),
281 &values_asc,
282 )
283 .unwrap();
284
285 assert_roundtrip(
287 &run_array_asc,
288 DataType::Int32,
289 DataType::Utf8,
290 Some(SortOptions {
291 descending: false,
292 nulls_first: true,
293 }),
294 );
295
296 assert_roundtrip(
298 &run_array_asc,
299 DataType::Int32,
300 DataType::Utf8,
301 Some(SortOptions {
302 descending: true,
303 nulls_first: true,
304 }),
305 );
306 }
307
308 #[test]
309 fn test_run_end_encoded_sort_configurations_basic() {
310 let test_array: RunArray<Int32Type> = vec!["test"].into_iter().collect();
313
314 assert_roundtrip(
316 &test_array,
317 DataType::Int32,
318 DataType::Utf8,
319 Some(SortOptions {
320 descending: false,
321 nulls_first: true,
322 }),
323 );
324
325 assert_roundtrip(
327 &test_array,
328 DataType::Int32,
329 DataType::Utf8,
330 Some(SortOptions {
331 descending: true,
332 nulls_first: true,
333 }),
334 );
335 }
336
337 #[test]
338 fn test_run_end_encoded_nulls_first_last_configurations() {
339 let simple_array: RunArray<Int32Type> = vec!["simple"].into_iter().collect();
342
343 let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
344 DataType::RunEndEncoded(
345 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
346 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
347 ),
348 SortOptions {
349 descending: false,
350 nulls_first: true,
351 },
352 )])
353 .unwrap();
354
355 let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
356 DataType::RunEndEncoded(
357 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
358 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
359 ),
360 SortOptions {
361 descending: false,
362 nulls_first: false,
363 },
364 )])
365 .unwrap();
366
367 let rows_nulls_first = converter_nulls_first
369 .convert_columns(&[Arc::new(simple_array.clone())])
370 .unwrap();
371 let arrays_nulls_first = converter_nulls_first
372 .convert_rows(&rows_nulls_first)
373 .unwrap();
374 let result_nulls_first = arrays_nulls_first[0]
375 .as_any()
376 .downcast_ref::<RunArray<Int32Type>>()
377 .unwrap();
378
379 let rows_nulls_last = converter_nulls_last
380 .convert_columns(&[Arc::new(simple_array.clone())])
381 .unwrap();
382 let arrays_nulls_last = converter_nulls_last.convert_rows(&rows_nulls_last).unwrap();
383 let result_nulls_last = arrays_nulls_last[0]
384 .as_any()
385 .downcast_ref::<RunArray<Int32Type>>()
386 .unwrap();
387
388 assert_eq!(simple_array.len(), result_nulls_first.len());
390 assert_eq!(simple_array.len(), result_nulls_last.len());
391 }
392
393 #[test]
394 fn test_run_end_encoded_row_consumption() {
395 let array: RunArray<Int32Type> = vec!["a", "a", "b", "b", "b", "c"].into_iter().collect();
401 let string_array = StringArray::from(vec!["x", "y", "z", "w", "u", "v"]);
402
403 let multi_converter = RowConverter::new(vec![
404 SortField::new(DataType::RunEndEncoded(
405 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
406 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
407 )),
408 SortField::new(DataType::Utf8),
409 ])
410 .unwrap();
411
412 let multi_rows = multi_converter
413 .convert_columns(&[Arc::new(array.clone()), Arc::new(string_array.clone())])
414 .unwrap();
415
416 let arrays = multi_converter.convert_rows(&multi_rows).unwrap();
418
419 let result_ree = arrays[0]
421 .as_any()
422 .downcast_ref::<RunArray<Int32Type>>()
423 .unwrap();
424
425 let result_string = arrays[1].as_any().downcast_ref::<StringArray>().unwrap();
426
427 assert_eq!(result_ree.values().as_ref(), array.values().as_ref());
429 assert_eq!(result_ree.run_ends().values(), array.run_ends().values());
430 assert_eq!(*result_string, string_array);
431 }
432
433 #[test]
434 fn test_run_end_encoded_sorting_behavior() {
435 let array1: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
439 let array2: RunArray<Int32Type> = vec!["banana", "banana"].into_iter().collect();
440 let array3: RunArray<Int32Type> = vec!["cherry", "cherry"].into_iter().collect();
441
442 let converter_asc = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
444 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
445 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
446 ))])
447 .unwrap();
448
449 let rows1_asc = converter_asc
450 .convert_columns(&[Arc::new(array1.clone())])
451 .unwrap();
452 let rows2_asc = converter_asc
453 .convert_columns(&[Arc::new(array2.clone())])
454 .unwrap();
455 let rows3_asc = converter_asc
456 .convert_columns(&[Arc::new(array3.clone())])
457 .unwrap();
458
459 assert!(
462 rows1_asc.row(0) < rows2_asc.row(0),
463 "apple should come before banana in ascending order"
464 );
465 assert!(
466 rows2_asc.row(0) < rows3_asc.row(0),
467 "banana should come before cherry in ascending order"
468 );
469 assert!(
470 rows1_asc.row(0) < rows3_asc.row(0),
471 "apple should come before cherry in ascending order"
472 );
473
474 let converter_desc = RowConverter::new(vec![SortField::new_with_options(
476 DataType::RunEndEncoded(
477 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
478 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
479 ),
480 arrow_schema::SortOptions {
481 descending: true,
482 nulls_first: true,
483 },
484 )])
485 .unwrap();
486
487 let rows1_desc = converter_desc
488 .convert_columns(&[Arc::new(array1.clone())])
489 .unwrap();
490 let rows2_desc = converter_desc
491 .convert_columns(&[Arc::new(array2.clone())])
492 .unwrap();
493 let rows3_desc = converter_desc
494 .convert_columns(&[Arc::new(array3.clone())])
495 .unwrap();
496
497 assert!(
500 rows3_desc.row(0) < rows2_desc.row(0),
501 "cherry should come before banana in descending order (byte-wise)"
502 );
503 assert!(
504 rows2_desc.row(0) < rows1_desc.row(0),
505 "banana should come before apple in descending order (byte-wise)"
506 );
507 assert!(
508 rows3_desc.row(0) < rows1_desc.row(0),
509 "cherry should come before apple in descending order (byte-wise)"
510 );
511 }
512
513 #[test]
514 fn test_run_end_encoded_null_sorting() {
515 let array_with_nulls: RunArray<Int32Type> = vec![None, None].into_iter().collect();
518 let array_with_values: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
519
520 let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
522 DataType::RunEndEncoded(
523 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
524 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
525 ),
526 arrow_schema::SortOptions {
527 descending: false,
528 nulls_first: true,
529 },
530 )])
531 .unwrap();
532
533 let rows_nulls = converter_nulls_first
534 .convert_columns(&[Arc::new(array_with_nulls.clone())])
535 .unwrap();
536 let rows_values = converter_nulls_first
537 .convert_columns(&[Arc::new(array_with_values.clone())])
538 .unwrap();
539
540 assert!(
542 rows_nulls.row(0) < rows_values.row(0),
543 "nulls should come before values when nulls_first=true"
544 );
545
546 let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
548 DataType::RunEndEncoded(
549 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
550 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
551 ),
552 arrow_schema::SortOptions {
553 descending: false,
554 nulls_first: false,
555 },
556 )])
557 .unwrap();
558
559 let rows_nulls_last = converter_nulls_last
560 .convert_columns(&[Arc::new(array_with_nulls.clone())])
561 .unwrap();
562 let rows_values_last = converter_nulls_last
563 .convert_columns(&[Arc::new(array_with_values.clone())])
564 .unwrap();
565
566 assert!(
568 rows_values_last.row(0) < rows_nulls_last.row(0),
569 "values should come before nulls when nulls_first=false"
570 );
571 }
572
573 #[test]
574 fn test_run_end_encoded_mixed_sorting() {
575 let array1: RunArray<Int32Type> = vec![Some("apple"), None].into_iter().collect();
578 let array2: RunArray<Int32Type> = vec![None, Some("banana")].into_iter().collect();
579 let array3: RunArray<Int32Type> =
580 vec![Some("cherry"), Some("cherry")].into_iter().collect();
581
582 let converter = RowConverter::new(vec![SortField::new_with_options(
583 DataType::RunEndEncoded(
584 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
585 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
586 ),
587 arrow_schema::SortOptions {
588 descending: false,
589 nulls_first: true,
590 },
591 )])
592 .unwrap();
593
594 let rows1 = converter.convert_columns(&[Arc::new(array1)]).unwrap();
595 let rows2 = converter.convert_columns(&[Arc::new(array2)]).unwrap();
596 let rows3 = converter.convert_columns(&[Arc::new(array3)]).unwrap();
597
598 assert!(rows2.row(0) < rows1.row(0), "null should come before apple");
604 assert!(
605 rows1.row(0) < rows3.row(0),
606 "apple should come before cherry"
607 );
608
609 assert!(
611 rows1.row(1) < rows2.row(1),
612 "null should come before banana"
613 );
614 assert!(
615 rows2.row(1) < rows3.row(1),
616 "banana should come before cherry"
617 );
618 }
619
620 #[test]
621 fn test_run_end_encoded_empty() {
622 let values: Vec<&str> = vec![];
624 let array: RunArray<Int32Type> = values.into_iter().collect();
625
626 let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
627 Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
628 Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
629 ))])
630 .unwrap();
631
632 let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
633 assert_eq!(rows.num_rows(), 0);
634
635 let arrays = converter.convert_rows(&rows).unwrap();
637 assert_eq!(arrays.len(), 1);
638 let result_ree = arrays[0].as_run::<Int32Type>();
640 assert_eq!(result_ree.len(), 0);
641 }
642}