arrow_row/
run.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{variable, RowConverter, Rows, SortField};
19use arrow_array::types::RunEndIndexType;
20use arrow_array::{PrimitiveArray, RunArray};
21use arrow_buffer::{ArrowNativeType, ScalarBuffer};
22use arrow_schema::{ArrowError, SortOptions};
23
24/// Computes the lengths of each row for a RunEndEncodedArray
25pub fn compute_lengths<R: RunEndIndexType>(
26    lengths: &mut [usize],
27    rows: &Rows,
28    array: &RunArray<R>,
29) {
30    let run_ends = array.run_ends().values();
31    let mut logical_start = 0;
32
33    // Iterate over each run and apply the same length to all logical positions in the run
34    for (physical_idx, &run_end) in run_ends.iter().enumerate() {
35        let logical_end = run_end.as_usize();
36        let row = rows.row(physical_idx);
37        let encoded_len = variable::encoded_len(Some(row.data));
38
39        // Add the same length for all logical positions in this run
40        for length in &mut lengths[logical_start..logical_end] {
41            *length += encoded_len;
42        }
43
44        logical_start = logical_end;
45    }
46}
47
48/// Encodes the provided `RunEndEncodedArray` to `out` with the provided `SortOptions`
49///
50/// `rows` should contain the encoded values
51pub fn encode<R: RunEndIndexType>(
52    data: &mut [u8],
53    offsets: &mut [usize],
54    rows: &Rows,
55    opts: SortOptions,
56    array: &RunArray<R>,
57) {
58    let run_ends = array.run_ends();
59
60    let mut logical_idx = 0;
61    let mut offset_idx = 1; // Skip first offset
62
63    // Iterate over each run
64    for physical_idx in 0..run_ends.values().len() {
65        let run_end = run_ends.values()[physical_idx].as_usize();
66
67        // Process all elements in this run
68        while logical_idx < run_end && offset_idx < offsets.len() {
69            let offset = &mut offsets[offset_idx];
70            let out = &mut data[*offset..];
71
72            // Use variable-length encoding to make the data self-describing
73            let row = rows.row(physical_idx);
74            let bytes_written = variable::encode_one(out, Some(row.data), opts);
75            *offset += bytes_written;
76
77            logical_idx += 1;
78            offset_idx += 1;
79        }
80
81        // Break if we've processed all offsets
82        if offset_idx >= offsets.len() {
83            break;
84        }
85    }
86}
87
88/// Decodes a RunEndEncodedArray from `rows` with the provided `options`
89///
90/// # Safety
91///
92/// `rows` must contain valid data for the provided `converter`
93pub unsafe fn decode<R: RunEndIndexType>(
94    converter: &RowConverter,
95    rows: &mut [&[u8]],
96    field: &SortField,
97    validate_utf8: bool,
98) -> Result<RunArray<R>, ArrowError> {
99    if rows.is_empty() {
100        let values = converter.convert_raw(&mut [], validate_utf8)?;
101        let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(vec![]), None);
102        return RunArray::<R>::try_new(&run_ends_array, &values[0]);
103    }
104
105    // Decode each row's REE data and collect the decoded values
106    let mut decoded_values = Vec::new();
107    let mut run_ends = Vec::new();
108    let mut unique_row_indices = Vec::new();
109
110    // Process each row to extract its REE data (following decode_binary pattern)
111    let mut decoded_data = Vec::new();
112    for (idx, row) in rows.iter_mut().enumerate() {
113        decoded_data.clear();
114        // Extract the decoded value data from this row
115        let consumed = variable::decode_blocks(row, field.options, |block| {
116            decoded_data.extend_from_slice(block);
117        });
118
119        // Handle bit inversion for descending sort (following decode_binary pattern)
120        if field.options.descending {
121            decoded_data.iter_mut().for_each(|b| *b = !*b);
122        }
123
124        // Update the row to point past the consumed REE data
125        *row = &row[consumed..];
126
127        // Check if this decoded value is the same as the previous one to identify runs
128        let is_new_run =
129            idx == 0 || decoded_data != decoded_values[*unique_row_indices.last().unwrap()];
130
131        if is_new_run {
132            // This is a new unique value - end the previous run if any
133            if idx > 0 {
134                run_ends.push(R::Native::usize_as(idx));
135            }
136            unique_row_indices.push(decoded_values.len());
137            decoded_values.push(decoded_data.clone());
138        }
139    }
140    // Add the final run end
141    run_ends.push(R::Native::usize_as(rows.len()));
142
143    // Convert the unique decoded values using the row converter
144    let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| v.as_slice()).collect();
145    let values = if unique_rows.is_empty() {
146        converter.convert_raw(&mut [], validate_utf8)?
147    } else {
148        converter.convert_raw(&mut unique_rows, validate_utf8)?
149    };
150
151    // Create run ends array
152    let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None);
153
154    // Create the RunEndEncodedArray
155    RunArray::<R>::try_new(&run_ends_array, &values[0])
156}
157
158#[cfg(test)]
159mod tests {
160    use crate::{RowConverter, SortField};
161    use arrow_array::cast::AsArray;
162    use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
163    use arrow_array::{Array, Int64Array, PrimitiveArray, RunArray, StringArray};
164    use arrow_schema::{DataType, SortOptions};
165    use std::sync::Arc;
166
167    fn assert_roundtrip<R: RunEndIndexType>(
168        array: &RunArray<R>,
169        run_end_type: DataType,
170        values_type: DataType,
171        sort_options: Option<SortOptions>,
172    ) {
173        let sort_field = if let Some(options) = sort_options {
174            SortField::new_with_options(
175                DataType::RunEndEncoded(
176                    Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
177                    Arc::new(arrow_schema::Field::new("values", values_type, true)),
178                ),
179                options,
180            )
181        } else {
182            SortField::new(DataType::RunEndEncoded(
183                Arc::new(arrow_schema::Field::new("run_ends", run_end_type, false)),
184                Arc::new(arrow_schema::Field::new("values", values_type, true)),
185            ))
186        };
187
188        let converter = RowConverter::new(vec![sort_field]).unwrap();
189
190        let rows = converter
191            .convert_columns(&[Arc::new(array.clone())])
192            .unwrap();
193
194        let arrays = converter.convert_rows(&rows).unwrap();
195        let result = arrays[0].as_any().downcast_ref::<RunArray<R>>().unwrap();
196
197        assert_eq!(array, result);
198    }
199
200    #[test]
201    fn test_run_end_encoded_supports_datatype() {
202        // Test that the RowConverter correctly supports run-end encoded arrays
203        assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded(
204            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
205            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
206        )));
207    }
208
209    #[test]
210    fn test_run_end_encoded_round_trip_int16_int64s() {
211        // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it
212        // doesn't just work with eg. strings (which are all the other tests).
213
214        let values = Int64Array::from(vec![100, 200, 100, 300]);
215        let run_ends = vec![2, 3, 5, 6];
216        let array: RunArray<Int16Type> =
217            RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
218
219        assert_roundtrip(&array, DataType::Int16, DataType::Int64, None);
220    }
221
222    #[test]
223    fn test_run_end_encoded_round_trip_int32_int64s() {
224        // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it
225        // doesn't just work with eg. strings (which are all the other tests).
226
227        let values = Int64Array::from(vec![100, 200, 100, 300]);
228        let run_ends = vec![2, 3, 5, 6];
229        let array: RunArray<Int32Type> =
230            RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
231
232        assert_roundtrip(&array, DataType::Int32, DataType::Int64, None);
233    }
234
235    #[test]
236    fn test_run_end_encoded_round_trip_int64_int64s() {
237        // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it
238        // doesn't just work with eg. strings (which are all the other tests).
239
240        let values = Int64Array::from(vec![100, 200, 100, 300]);
241        let run_ends = vec![2, 3, 5, 6];
242        let array: RunArray<Int64Type> =
243            RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap();
244
245        assert_roundtrip(&array, DataType::Int64, DataType::Int64, None);
246    }
247
248    #[test]
249    fn test_run_end_encoded_round_trip_strings() {
250        // Test round-trip correctness for RunEndEncodedArray with strings
251
252        let array: RunArray<Int32Type> = vec!["b", "b", "a"].into_iter().collect();
253
254        assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
255    }
256
257    #[test]
258    fn test_run_end_encoded_round_trip_strings_with_nulls() {
259        // Test round-trip correctness for RunEndEncodedArray with nulls
260
261        let array: RunArray<Int32Type> = vec![Some("b"), Some("b"), None, Some("a")]
262            .into_iter()
263            .collect();
264
265        assert_roundtrip(&array, DataType::Int32, DataType::Utf8, None);
266    }
267
268    #[test]
269    fn test_run_end_encoded_ascending_descending_round_trip() {
270        // Test round-trip correctness for ascending vs descending sort options
271
272        let values_asc =
273            arrow_array::StringArray::from(vec![Some("apple"), Some("banana"), Some("cherry")]);
274        let run_ends_asc = vec![2, 4, 6];
275        let run_array_asc: RunArray<Int32Type> = RunArray::try_new(
276            &arrow_array::PrimitiveArray::from(run_ends_asc),
277            &values_asc,
278        )
279        .unwrap();
280
281        // Test ascending order
282        assert_roundtrip(
283            &run_array_asc,
284            DataType::Int32,
285            DataType::Utf8,
286            Some(SortOptions {
287                descending: false,
288                nulls_first: true,
289            }),
290        );
291
292        // Test descending order
293        assert_roundtrip(
294            &run_array_asc,
295            DataType::Int32,
296            DataType::Utf8,
297            Some(SortOptions {
298                descending: true,
299                nulls_first: true,
300            }),
301        );
302    }
303
304    #[test]
305    fn test_run_end_encoded_sort_configurations_basic() {
306        // Test that different sort configurations work and can round-trip successfully
307
308        let test_array: RunArray<Int32Type> = vec!["test"].into_iter().collect();
309
310        // Test ascending order
311        assert_roundtrip(
312            &test_array,
313            DataType::Int32,
314            DataType::Utf8,
315            Some(SortOptions {
316                descending: false,
317                nulls_first: true,
318            }),
319        );
320
321        // Test descending order
322        assert_roundtrip(
323            &test_array,
324            DataType::Int32,
325            DataType::Utf8,
326            Some(SortOptions {
327                descending: true,
328                nulls_first: true,
329            }),
330        );
331    }
332
333    #[test]
334    fn test_run_end_encoded_nulls_first_last_configurations() {
335        // Test that nulls_first vs nulls_last configurations work
336
337        let simple_array: RunArray<Int32Type> = vec!["simple"].into_iter().collect();
338
339        let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
340            DataType::RunEndEncoded(
341                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
342                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
343            ),
344            SortOptions {
345                descending: false,
346                nulls_first: true,
347            },
348        )])
349        .unwrap();
350
351        let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
352            DataType::RunEndEncoded(
353                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
354                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
355            ),
356            SortOptions {
357                descending: false,
358                nulls_first: false,
359            },
360        )])
361        .unwrap();
362
363        // Test that both configurations can handle simple arrays
364        let rows_nulls_first = converter_nulls_first
365            .convert_columns(&[Arc::new(simple_array.clone())])
366            .unwrap();
367        let arrays_nulls_first = converter_nulls_first
368            .convert_rows(&rows_nulls_first)
369            .unwrap();
370        let result_nulls_first = arrays_nulls_first[0]
371            .as_any()
372            .downcast_ref::<RunArray<Int32Type>>()
373            .unwrap();
374
375        let rows_nulls_last = converter_nulls_last
376            .convert_columns(&[Arc::new(simple_array.clone())])
377            .unwrap();
378        let arrays_nulls_last = converter_nulls_last.convert_rows(&rows_nulls_last).unwrap();
379        let result_nulls_last = arrays_nulls_last[0]
380            .as_any()
381            .downcast_ref::<RunArray<Int32Type>>()
382            .unwrap();
383
384        // Both should successfully convert the simple array
385        assert_eq!(simple_array.len(), result_nulls_first.len());
386        assert_eq!(simple_array.len(), result_nulls_last.len());
387    }
388
389    #[test]
390    fn test_run_end_encoded_row_consumption() {
391        // This test verifies that ALL rows are properly consumed during decoding,
392        // not just the unique values. We test this by ensuring multi-column conversion
393        // works correctly - if rows aren't consumed properly, the second column would fail.
394
395        // Create a REE array with multiple runs
396        let array: RunArray<Int32Type> = vec!["a", "a", "b", "b", "b", "c"].into_iter().collect();
397        let string_array = StringArray::from(vec!["x", "y", "z", "w", "u", "v"]);
398
399        let multi_converter = RowConverter::new(vec![
400            SortField::new(DataType::RunEndEncoded(
401                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
402                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
403            )),
404            SortField::new(DataType::Utf8),
405        ])
406        .unwrap();
407
408        let multi_rows = multi_converter
409            .convert_columns(&[Arc::new(array.clone()), Arc::new(string_array.clone())])
410            .unwrap();
411
412        // Convert back - this will test that all rows are consumed properly
413        let arrays = multi_converter.convert_rows(&multi_rows).unwrap();
414
415        // Verify both columns round-trip correctly
416        let result_ree = arrays[0]
417            .as_any()
418            .downcast_ref::<RunArray<Int32Type>>()
419            .unwrap();
420
421        let result_string = arrays[1].as_any().downcast_ref::<StringArray>().unwrap();
422
423        // This should pass - both arrays should be identical to originals
424        assert_eq!(result_ree.values().as_ref(), array.values().as_ref());
425        assert_eq!(result_ree.run_ends().values(), array.run_ends().values());
426        assert_eq!(*result_string, string_array);
427    }
428
429    #[test]
430    fn test_run_end_encoded_sorting_behavior() {
431        // Test that the binary row encoding actually produces the correct sort order
432
433        // Create REE arrays with different values to test sorting
434        let array1: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
435        let array2: RunArray<Int32Type> = vec!["banana", "banana"].into_iter().collect();
436        let array3: RunArray<Int32Type> = vec!["cherry", "cherry"].into_iter().collect();
437
438        // Test ascending sort
439        let converter_asc = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
440            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
441            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
442        ))])
443        .unwrap();
444
445        let rows1_asc = converter_asc
446            .convert_columns(&[Arc::new(array1.clone())])
447            .unwrap();
448        let rows2_asc = converter_asc
449            .convert_columns(&[Arc::new(array2.clone())])
450            .unwrap();
451        let rows3_asc = converter_asc
452            .convert_columns(&[Arc::new(array3.clone())])
453            .unwrap();
454
455        // For ascending: apple < banana < cherry
456        // So row bytes should sort: rows1 < rows2 < rows3
457        assert!(
458            rows1_asc.row(0) < rows2_asc.row(0),
459            "apple should come before banana in ascending order"
460        );
461        assert!(
462            rows2_asc.row(0) < rows3_asc.row(0),
463            "banana should come before cherry in ascending order"
464        );
465        assert!(
466            rows1_asc.row(0) < rows3_asc.row(0),
467            "apple should come before cherry in ascending order"
468        );
469
470        // Test descending sort
471        let converter_desc = RowConverter::new(vec![SortField::new_with_options(
472            DataType::RunEndEncoded(
473                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
474                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
475            ),
476            arrow_schema::SortOptions {
477                descending: true,
478                nulls_first: true,
479            },
480        )])
481        .unwrap();
482
483        let rows1_desc = converter_desc
484            .convert_columns(&[Arc::new(array1.clone())])
485            .unwrap();
486        let rows2_desc = converter_desc
487            .convert_columns(&[Arc::new(array2.clone())])
488            .unwrap();
489        let rows3_desc = converter_desc
490            .convert_columns(&[Arc::new(array3.clone())])
491            .unwrap();
492
493        // For descending: cherry > banana > apple
494        // So row bytes should sort: rows3 < rows2 < rows1 (because byte comparison is ascending)
495        assert!(
496            rows3_desc.row(0) < rows2_desc.row(0),
497            "cherry should come before banana in descending order (byte-wise)"
498        );
499        assert!(
500            rows2_desc.row(0) < rows1_desc.row(0),
501            "banana should come before apple in descending order (byte-wise)"
502        );
503        assert!(
504            rows3_desc.row(0) < rows1_desc.row(0),
505            "cherry should come before apple in descending order (byte-wise)"
506        );
507    }
508
509    #[test]
510    fn test_run_end_encoded_null_sorting() {
511        // Test null handling in sort order
512
513        let array_with_nulls: RunArray<Int32Type> = vec![None, None].into_iter().collect();
514        let array_with_values: RunArray<Int32Type> = vec!["apple", "apple"].into_iter().collect();
515
516        // Test nulls_first = true
517        let converter_nulls_first = RowConverter::new(vec![SortField::new_with_options(
518            DataType::RunEndEncoded(
519                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
520                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
521            ),
522            arrow_schema::SortOptions {
523                descending: false,
524                nulls_first: true,
525            },
526        )])
527        .unwrap();
528
529        let rows_nulls = converter_nulls_first
530            .convert_columns(&[Arc::new(array_with_nulls.clone())])
531            .unwrap();
532        let rows_values = converter_nulls_first
533            .convert_columns(&[Arc::new(array_with_values.clone())])
534            .unwrap();
535
536        // nulls should come before values when nulls_first = true
537        assert!(
538            rows_nulls.row(0) < rows_values.row(0),
539            "nulls should come before values when nulls_first=true"
540        );
541
542        // Test nulls_first = false
543        let converter_nulls_last = RowConverter::new(vec![SortField::new_with_options(
544            DataType::RunEndEncoded(
545                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
546                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
547            ),
548            arrow_schema::SortOptions {
549                descending: false,
550                nulls_first: false,
551            },
552        )])
553        .unwrap();
554
555        let rows_nulls_last = converter_nulls_last
556            .convert_columns(&[Arc::new(array_with_nulls.clone())])
557            .unwrap();
558        let rows_values_last = converter_nulls_last
559            .convert_columns(&[Arc::new(array_with_values.clone())])
560            .unwrap();
561
562        // values should come before nulls when nulls_first = false
563        assert!(
564            rows_values_last.row(0) < rows_nulls_last.row(0),
565            "values should come before nulls when nulls_first=false"
566        );
567    }
568
569    #[test]
570    fn test_run_end_encoded_mixed_sorting() {
571        // Test sorting with mixed values and nulls to ensure complex scenarios work
572
573        let array1: RunArray<Int32Type> = vec![Some("apple"), None].into_iter().collect();
574        let array2: RunArray<Int32Type> = vec![None, Some("banana")].into_iter().collect();
575        let array3: RunArray<Int32Type> =
576            vec![Some("cherry"), Some("cherry")].into_iter().collect();
577
578        let converter = RowConverter::new(vec![SortField::new_with_options(
579            DataType::RunEndEncoded(
580                Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
581                Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
582            ),
583            arrow_schema::SortOptions {
584                descending: false,
585                nulls_first: true,
586            },
587        )])
588        .unwrap();
589
590        let rows1 = converter.convert_columns(&[Arc::new(array1)]).unwrap();
591        let rows2 = converter.convert_columns(&[Arc::new(array2)]).unwrap();
592        let rows3 = converter.convert_columns(&[Arc::new(array3)]).unwrap();
593
594        // With nulls_first=true, ascending:
595        // Row 0: array1[0]="apple", array2[0]=null, array3[0]="cherry" -> null < apple < cherry
596        // Row 1: array1[1]=null, array2[1]="banana", array3[1]="cherry" -> null < banana < cherry
597
598        // Compare first rows: null < apple < cherry
599        assert!(rows2.row(0) < rows1.row(0), "null should come before apple");
600        assert!(
601            rows1.row(0) < rows3.row(0),
602            "apple should come before cherry"
603        );
604
605        // Compare second rows: null < banana < cherry
606        assert!(
607            rows1.row(1) < rows2.row(1),
608            "null should come before banana"
609        );
610        assert!(
611            rows2.row(1) < rows3.row(1),
612            "banana should come before cherry"
613        );
614    }
615
616    #[test]
617    fn test_run_end_encoded_empty() {
618        // Test converting / decoding an empty RunEndEncodedArray
619        let values: Vec<&str> = vec![];
620        let array: RunArray<Int32Type> = values.into_iter().collect();
621
622        let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
623            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)),
624            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
625        ))])
626        .unwrap();
627
628        let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
629        assert_eq!(rows.num_rows(), 0);
630
631        // Likewise converting empty rows should yield an empty RunEndEncodedArray
632        let arrays = converter.convert_rows(&rows).unwrap();
633        assert_eq!(arrays.len(), 1);
634        // Verify both columns round-trip correctly
635        let result_ree = arrays[0].as_run::<Int32Type>();
636        assert_eq!(result_ree.len(), 0);
637    }
638}