arrow_row/
run.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{RowConverter, Rows, SortField, variable};
19use arrow_array::types::RunEndIndexType;
20use arrow_array::{PrimitiveArray, RunArray};
21use arrow_buffer::{ArrowNativeType, ScalarBuffer};
22use arrow_schema::{ArrowError, SortOptions};
23
24/// Computes the lengths of each row for a RunEndEncodedArray
25pub fn compute_lengths<R: RunEndIndexType>(
26    lengths: &mut [usize],
27    rows: &Rows,
28    array: &RunArray<R>,
29) {
30    let run_ends = array.run_ends().values();
31    let mut logical_start = 0;
32
33    // Iterate over each run and apply the same length to all logical positions in the run
34    for (physical_idx, &run_end) in run_ends.iter().enumerate() {
35        let logical_end = run_end.as_usize();
36        let row_len = rows.row_len(physical_idx);
37        let encoded_len = variable::padded_length(Some(row_len));
38
39        // Add the same length for all logical positions in this run
40        for length in &mut lengths[logical_start..logical_end] {
41            *length += encoded_len;
42        }
43
44        logical_start = logical_end;
45    }
46}
47
48/// Encodes the provided `RunEndEncodedArray` to `out` with the provided `SortOptions`
49///
50/// `rows` should contain the encoded values
51pub fn encode<R: RunEndIndexType>(
52    data: &mut [u8],
53    offsets: &mut [usize],
54    rows: &Rows,
55    opts: SortOptions,
56    array: &RunArray<R>,
57) {
58    let run_ends = array.run_ends();
59
60    let mut logical_idx = 0;
61    let mut offset_idx = 1; // Skip first offset
62
63    // Iterate over each run
64    for physical_idx in 0..run_ends.values().len() {
65        let run_end = run_ends.values()[physical_idx].as_usize();
66
67        // Process all elements in this run
68        while logical_idx < run_end && offset_idx < offsets.len() {
69            let offset = &mut offsets[offset_idx];
70            let out = &mut data[*offset..];
71
72            // Use variable-length encoding to make the data self-describing
73            let row = rows.row(physical_idx);
74            let bytes_written = variable::encode_one(out, Some(row.data), opts);
75            *offset += bytes_written;
76
77            logical_idx += 1;
78            offset_idx += 1;
79        }
80
81        // Break if we've processed all offsets
82        if offset_idx >= offsets.len() {
83            break;
84        }
85    }
86}
87
88/// Decodes a RunEndEncodedArray from `rows` with the provided `options`
89///
90/// # Safety
91///
92/// `rows` must contain valid data for the provided `converter`
93pub unsafe fn decode<R: RunEndIndexType>(
94    converter: &RowConverter,
95    rows: &mut [&[u8]],
96    field: &SortField,
97    validate_utf8: bool,
98) -> Result<RunArray<R>, ArrowError> {
99    if rows.is_empty() {
100        let values = unsafe { converter.convert_raw(&mut [], validate_utf8) }?;
101        let run_ends_array = PrimitiveArray::<R>::try_new(ScalarBuffer::from(vec![]), None)?;
102        return RunArray::<R>::try_new(&run_ends_array, &values[0]);
103    }
104
105    // Decode each row's REE data and collect the decoded values
106    let mut decoded_values = Vec::new();
107    let mut run_ends = Vec::new();
108    let mut unique_row_indices = Vec::new();
109
110    // Process each row to extract its REE data (following decode_binary pattern)
111    let mut decoded_data = Vec::new();
112    for (idx, row) in rows.iter_mut().enumerate() {
113        decoded_data.clear();
114        // Extract the decoded value data from this row
115        let consumed = variable::decode_blocks(row, field.options, |block| {
116            decoded_data.extend_from_slice(block);
117        });
118
119        // Handle bit inversion for descending sort (following decode_binary pattern)
120        if field.options.descending {
121            decoded_data.iter_mut().for_each(|b| *b = !*b);
122        }
123
124        // Update the row to point past the consumed REE data
125        *row = &row[consumed..];
126
127        // Check if this decoded value is the same as the previous one to identify runs
128        let is_new_run =
129            idx == 0 || decoded_data != decoded_values[*unique_row_indices.last().unwrap()];
130
131        if is_new_run {
132            // This is a new unique value - end the previous run if any
133            if idx > 0 {
134                run_ends.push(R::Native::usize_as(idx));
135            }
136            unique_row_indices.push(decoded_values.len());
137            let capacity = decoded_data.capacity();
138            decoded_values.push(std::mem::replace(
139                &mut decoded_data,
140                Vec::with_capacity(capacity),
141            ));
142        }
143    }
144    // Add the final run end
145    run_ends.push(R::Native::usize_as(rows.len()));
146
147    // Convert the unique decoded values using the row converter
148    let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| v.as_slice()).collect();
149    let values = if unique_rows.is_empty() {
150        unsafe { converter.convert_raw(&mut [], validate_utf8) }?
151    } else {
152        unsafe { converter.convert_raw(&mut unique_rows, validate_utf8) }?
153    };
154
155    // Create run ends array
156    let run_ends_array = PrimitiveArray::<R>::try_new(ScalarBuffer::from(run_ends), None)?;
157
158    // Create the RunEndEncodedArray
159    RunArray::<R>::try_new(&run_ends_array, &values[0])
160}
161
162#[cfg(test)]
163mod tests {
164    use crate::{RowConverter, SortField};
165    use arrow_array::cast::AsArray;
166    use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
167    use arrow_array::{Array, Int64Array, PrimitiveArray, RunArray, StringArray};
168    use arrow_schema::{DataType, SortOptions};
169    use std::sync::Arc;
170
171    fn assert_roundtrip<R: RunEndIndexType>(
172        array: &RunArray<R>,
173        run_end_type: DataType,
174        values_type: DataType,
175        sort_options: Option<SortOptions>,
176    ) {
177        let sort_field = if let Some(options) = sort_options {
178            SortField::new_with_options(
179                DataType::RunEndEncoded(
180                    Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
181                    Arc::new(arrow_schema::Field::new("values", values_type, true)),
182                ),
183                options,
184            )
185        } else {
186            SortField::new(DataType::RunEndEncoded(
187                Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
188                Arc::new(arrow_schema::Field::new("values", values_type, true)),
189            ))
190        };
191
192        let converter = RowConverter::new(vec![sort_field]).unwrap();
193
194        let rows = converter
195            .convert_columns(&[Arc::new(array.clone())])
196            .unwrap();
197
198        let arrays = converter.convert_rows(&rows).unwrap();
199        let result = arrays[0].as_any().downcast_ref::<RunArray<R>>().unwrap();
200
201        assert_eq!(array, result);
202    }
203
204    #[test]
205    fn test_run_end_encoded_supports_datatype() {
206        // Test that the RowConverter correctly supports run-end encoded arrays
207        assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded(
208            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
209            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
210        )));
211    }
212
213    #[test]
214    fn test_run_end_encoded_round_trip_int16_int64s() {
215        // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it
216        // doesn't just work with eg. strings (which are all the other tests).
217
218        let values = Int64Array::from(vec![100, 200, 100, 300]);
219        let run_ends = vec![2, 3, 5, 6];
220        let array: RunArray<Int16Type> =
221            RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
222
223        assert_roundtrip(&array, DataType::Int16, DataType::Int64, None);
224    }
225
226    #[test]
227    fn test_run_end_encoded_round_trip_int32_int64s() {
228        // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it
229        // doesn't just work with eg. strings (which are all the other tests).
230
231        let values = Int64Array::from(vec![100, 200, 100, 300]);
232        let run_ends = vec![2, 3, 5, 6];
233        let array: RunArray<Int32Type> =
234            RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
235
236        assert_roundtrip(&array, DataType::Int32, DataType::Int64, None);
237    }
238
239    #[test]
240    fn test_run_end_encoded_round_trip_int64_int64s() {
241        // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it
242        // doesn't just work with eg. strings (which are all the other tests).
243
244        let values = Int64Array::from(vec![100, 200, 100, 300]);
245        let run_ends = vec![2, 3, 5, 6];
246        let array: RunArray<Int64Type> =
247            RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
248
249        assert_roundtrip(&array, DataType::Int64, DataType::Int64, None);
250    }
251
252    #[test]
253    fn test_run_end_encoded_round_trip_strings() {
254        // Test round-trip correctness for RunEndEncodedArray with strings
255
256        let array: RunArray<Int32Type> = vec!["b", "b", "a"].into_iter().collect();
257
258        assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
259    }
260
261    #[test]
262    fn test_run_end_encoded_round_trip_strings_with_nulls() {
263        // Test round-trip correctness for RunEndEncodedArray with nulls
264
265        let array: RunArray<Int32Type> = vec![Some("b"), Some("b"), None, Some("a")]
266            .into_iter()
267            .collect();
268
269        assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
270    }
271
272    #[test]
273    fn test_run_end_encoded_ascending_descending_round_trip() {
274        // Test round-trip correctness for ascending vs descending sort options
275
276        let values_asc =
277            arrow_array::StringArray::from(vec![Some("apple"), Some("banana"), Some("cherry")]);
278        let run_ends_asc = vec![2, 4, 6];
279        let run_array_asc: RunArray<Int32Type> = RunArray::try_new(
280            &arrow_array::PrimitiveArray::from(run_ends_asc),
281            &values_asc,
282        )
283        .unwrap();
284
285        // Test ascending order
286        assert_roundtrip(
287            &run_array_asc,
288            DataType::Int32,
289            DataType::Utf8,
290            Some(SortOptions {
291                descending: false,
292                nulls_first: true,
293            }),
294        );
295
296        // Test descending order
297        assert_roundtrip(
298            &run_array_asc,
299            DataType::Int32,
300            DataType::Utf8,
301            Some(SortOptions {
302                descending: true,
303                nulls_first: true,
304            }),
305        );
306    }
307
308    #[test]
309    fn test_run_end_encoded_sort_configurations_basic() {
310        // Test that different sort configurations work and can round-trip successfully
311
312        let test_array: RunArray<Int32Type> = vec!["test"].into_iter().collect();
313
314        // Test ascending order
315        assert_roundtrip(
316            &test_array,
317            DataType::Int32,
318            DataType::Utf8,
319            Some(SortOptions {
320                descending: false,
321                nulls_first: true,
322            }),
323        );
324
325        // Test descending order
326        assert_roundtrip(
327            &test_array,
328            DataType::Int32,
329            DataType::Utf8,
330            Some(SortOptions {
331                descending: true,
332                nulls_first: true,
333            }),
334        );
335    }
336
337    #[test]
338    fn test_run_end_encoded_nulls_first_last_configurations() {
339        // Test that nulls_first vs nulls_last configurations work
340
341        let simple_array: RunArray<Int32Type> = vec!["simple"].into_iter().collect();
342
343        let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
344            DataType::RunEndEncoded(
345                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
346                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
347            ),
348            SortOptions {
349                descending: false,
350                nulls_first: true,
351            },
352        )])
353        .unwrap();
354
355        let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
356            DataType::RunEndEncoded(
357                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
358                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
359            ),
360            SortOptions {
361                descending: false,
362                nulls_first: false,
363            },
364        )])
365        .unwrap();
366
367        // Test that both configurations can handle simple arrays
368        let rows_nulls_first = converter_nulls_first
369            .convert_columns(&[Arc::new(simple_array.clone())])
370            .unwrap();
371        let arrays_nulls_first = converter_nulls_first
372            .convert_rows(&rows_nulls_first)
373            .unwrap();
374        let result_nulls_first = arrays_nulls_first[0]
375            .as_any()
376            .downcast_ref::<RunArray<Int32Type>>()
377            .unwrap();
378
379        let rows_nulls_last = converter_nulls_last
380            .convert_columns(&[Arc::new(simple_array.clone())])
381            .unwrap();
382        let arrays_nulls_last = converter_nulls_last.convert_rows(&rows_nulls_last).unwrap();
383        let result_nulls_last = arrays_nulls_last[0]
384            .as_any()
385            .downcast_ref::<RunArray<Int32Type>>()
386            .unwrap();
387
388        // Both should successfully convert the simple array
389        assert_eq!(simple_array.len(), result_nulls_first.len());
390        assert_eq!(simple_array.len(), result_nulls_last.len());
391    }
392
393    #[test]
394    fn test_run_end_encoded_row_consumption() {
395        // This test verifies that ALL rows are properly consumed during decoding,
396        // not just the unique values. We test this by ensuring multi-column conversion
397        // works correctly - if rows aren't consumed properly, the second column would fail.
398
399        // Create a REE array with multiple runs
400        let array: RunArray<Int32Type> = vec!["a", "a", "b", "b", "b", "c"].into_iter().collect();
401        let string_array = StringArray::from(vec!["x", "y", "z", "w", "u", "v"]);
402
403        let multi_converter = RowConverter::new(vec![
404            SortField::new(DataType::RunEndEncoded(
405                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
406                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
407            )),
408            SortField::new(DataType::Utf8),
409        ])
410        .unwrap();
411
412        let multi_rows = multi_converter
413            .convert_columns(&[Arc::new(array.clone()), Arc::new(string_array.clone())])
414            .unwrap();
415
416        // Convert back - this will test that all rows are consumed properly
417        let arrays = multi_converter.convert_rows(&multi_rows).unwrap();
418
419        // Verify both columns round-trip correctly
420        let result_ree = arrays[0]
421            .as_any()
422            .downcast_ref::<RunArray<Int32Type>>()
423            .unwrap();
424
425        let result_string = arrays[1].as_any().downcast_ref::<StringArray>().unwrap();
426
427        // This should pass - both arrays should be identical to originals
428        assert_eq!(result_ree.values().as_ref(), array.values().as_ref());
429        assert_eq!(result_ree.run_ends().values(), array.run_ends().values());
430        assert_eq!(*result_string, string_array);
431    }
432
433    #[test]
434    fn test_run_end_encoded_sorting_behavior() {
435        // Test that the binary row encoding actually produces the correct sort order
436
437        // Create REE arrays with different values to test sorting
438        let array1: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
439        let array2: RunArray<Int32Type> = vec!["banana", "banana"].into_iter().collect();
440        let array3: RunArray<Int32Type> = vec!["cherry", "cherry"].into_iter().collect();
441
442        // Test ascending sort
443        let converter_asc = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
444            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
445            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
446        ))])
447        .unwrap();
448
449        let rows1_asc = converter_asc
450            .convert_columns(&[Arc::new(array1.clone())])
451            .unwrap();
452        let rows2_asc = converter_asc
453            .convert_columns(&[Arc::new(array2.clone())])
454            .unwrap();
455        let rows3_asc = converter_asc
456            .convert_columns(&[Arc::new(array3.clone())])
457            .unwrap();
458
459        // For ascending: apple < banana < cherry
460        // So row bytes should sort: rows1 < rows2 < rows3
461        assert!(
462            rows1_asc.row(0) < rows2_asc.row(0),
463            "apple should come before banana in ascending order"
464        );
465        assert!(
466            rows2_asc.row(0) < rows3_asc.row(0),
467            "banana should come before cherry in ascending order"
468        );
469        assert!(
470            rows1_asc.row(0) < rows3_asc.row(0),
471            "apple should come before cherry in ascending order"
472        );
473
474        // Test descending sort
475        let converter_desc = RowConverter::new(vec![SortField::new_with_options(
476            DataType::RunEndEncoded(
477                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
478                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
479            ),
480            arrow_schema::SortOptions {
481                descending: true,
482                nulls_first: true,
483            },
484        )])
485        .unwrap();
486
487        let rows1_desc = converter_desc
488            .convert_columns(&[Arc::new(array1.clone())])
489            .unwrap();
490        let rows2_desc = converter_desc
491            .convert_columns(&[Arc::new(array2.clone())])
492            .unwrap();
493        let rows3_desc = converter_desc
494            .convert_columns(&[Arc::new(array3.clone())])
495            .unwrap();
496
497        // For descending: cherry > banana > apple
498        // So row bytes should sort: rows3 < rows2 < rows1 (because byte comparison is ascending)
499        assert!(
500            rows3_desc.row(0) < rows2_desc.row(0),
501            "cherry should come before banana in descending order (byte-wise)"
502        );
503        assert!(
504            rows2_desc.row(0) < rows1_desc.row(0),
505            "banana should come before apple in descending order (byte-wise)"
506        );
507        assert!(
508            rows3_desc.row(0) < rows1_desc.row(0),
509            "cherry should come before apple in descending order (byte-wise)"
510        );
511    }
512
513    #[test]
514    fn test_run_end_encoded_null_sorting() {
515        // Test null handling in sort order
516
517        let array_with_nulls: RunArray<Int32Type> = vec![None, None].into_iter().collect();
518        let array_with_values: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
519
520        // Test nulls_first = true
521        let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
522            DataType::RunEndEncoded(
523                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
524                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
525            ),
526            arrow_schema::SortOptions {
527                descending: false,
528                nulls_first: true,
529            },
530        )])
531        .unwrap();
532
533        let rows_nulls = converter_nulls_first
534            .convert_columns(&[Arc::new(array_with_nulls.clone())])
535            .unwrap();
536        let rows_values = converter_nulls_first
537            .convert_columns(&[Arc::new(array_with_values.clone())])
538            .unwrap();
539
540        // nulls should come before values when nulls_first = true
541        assert!(
542            rows_nulls.row(0) < rows_values.row(0),
543            "nulls should come before values when nulls_first=true"
544        );
545
546        // Test nulls_first = false
547        let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
548            DataType::RunEndEncoded(
549                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
550                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
551            ),
552            arrow_schema::SortOptions {
553                descending: false,
554                nulls_first: false,
555            },
556        )])
557        .unwrap();
558
559        let rows_nulls_last = converter_nulls_last
560            .convert_columns(&[Arc::new(array_with_nulls.clone())])
561            .unwrap();
562        let rows_values_last = converter_nulls_last
563            .convert_columns(&[Arc::new(array_with_values.clone())])
564            .unwrap();
565
566        // values should come before nulls when nulls_first = false
567        assert!(
568            rows_values_last.row(0) < rows_nulls_last.row(0),
569            "values should come before nulls when nulls_first=false"
570        );
571    }
572
573    #[test]
574    fn test_run_end_encoded_mixed_sorting() {
575        // Test sorting with mixed values and nulls to ensure complex scenarios work
576
577        let array1: RunArray<Int32Type> = vec![Some("apple"), None].into_iter().collect();
578        let array2: RunArray<Int32Type> = vec![None, Some("banana")].into_iter().collect();
579        let array3: RunArray<Int32Type> =
580            vec![Some("cherry"), Some("cherry")].into_iter().collect();
581
582        let converter = RowConverter::new(vec![SortField::new_with_options(
583            DataType::RunEndEncoded(
584                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
585                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
586            ),
587            arrow_schema::SortOptions {
588                descending: false,
589                nulls_first: true,
590            },
591        )])
592        .unwrap();
593
594        let rows1 = converter.convert_columns(&[Arc::new(array1)]).unwrap();
595        let rows2 = converter.convert_columns(&[Arc::new(array2)]).unwrap();
596        let rows3 = converter.convert_columns(&[Arc::new(array3)]).unwrap();
597
598        // With nulls_first=true, ascending:
599        // Row 0: array1[0]="apple", array2[0]=null, array3[0]="cherry" -> null < apple < cherry
600        // Row 1: array1[1]=null, array2[1]="banana", array3[1]="cherry" -> null < banana < cherry
601
602        // Compare first rows: null < apple < cherry
603        assert!(rows2.row(0) < rows1.row(0), "null should come before apple");
604        assert!(
605            rows1.row(0) < rows3.row(0),
606            "apple should come before cherry"
607        );
608
609        // Compare second rows: null < banana < cherry
610        assert!(
611            rows1.row(1) < rows2.row(1),
612            "null should come before banana"
613        );
614        assert!(
615            rows2.row(1) < rows3.row(1),
616            "banana should come before cherry"
617        );
618    }
619
620    #[test]
621    fn test_run_end_encoded_empty() {
622        // Test converting / decoding an empty RunEndEncodedArray
623        let values: Vec<&str> = vec![];
624        let array: RunArray<Int32Type> = values.into_iter().collect();
625
626        let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
627            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
628            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
629        ))])
630        .unwrap();
631
632        let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
633        assert_eq!(rows.num_rows(), 0);
634
635        // Likewise converting empty rows should yield an empty RunEndEncodedArray
636        let arrays = converter.convert_rows(&rows).unwrap();
637        assert_eq!(arrays.len(), 1);
638        // Verify both columns round-trip correctly
639        let result_ree = arrays[0].as_run::<Int32Type>();
640        assert_eq!(result_ree.len(), 0);
641    }
642}