arrow_cast/cast/
run_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cast::*;
19use arrow_ord::partition::partition;
20
21/// Attempts to cast a `RunArray` with index type K into
22/// `to_type` for supported types.
23pub(crate) fn run_end_encoded_cast<K: RunEndIndexType>(
24    array: &dyn Array,
25    to_type: &DataType,
26    cast_options: &CastOptions,
27) -> Result<ArrayRef, ArrowError> {
28    match array.data_type() {
29        DataType::RunEndEncoded(_, _) => {
30            let run_array = array
31                .as_any()
32                .downcast_ref::<RunArray<K>>()
33                .ok_or_else(|| ArrowError::CastError("Expected RunArray".to_string()))?;
34
35            let values = run_array.values();
36
37            match to_type {
38                // Stay as RunEndEncoded, cast only the values
39                DataType::RunEndEncoded(target_index_field, target_value_field) => {
40                    let cast_values =
41                        cast_with_options(values, target_value_field.data_type(), cast_options)?;
42
43                    let run_ends_array = PrimitiveArray::<K>::from_iter_values(
44                        run_array.run_ends().values().iter().copied(),
45                    );
46                    let cast_run_ends = cast_with_options(
47                        &run_ends_array,
48                        target_index_field.data_type(),
49                        cast_options,
50                    )?;
51                    let new_run_array: ArrayRef = match target_index_field.data_type() {
52                        DataType::Int16 => {
53                            let re = cast_run_ends.as_primitive::<Int16Type>();
54                            Arc::new(RunArray::<Int16Type>::try_new(re, cast_values.as_ref())?)
55                        }
56                        DataType::Int32 => {
57                            let re = cast_run_ends.as_primitive::<Int32Type>();
58                            Arc::new(RunArray::<Int32Type>::try_new(re, cast_values.as_ref())?)
59                        }
60                        DataType::Int64 => {
61                            let re = cast_run_ends.as_primitive::<Int64Type>();
62                            Arc::new(RunArray::<Int64Type>::try_new(re, cast_values.as_ref())?)
63                        }
64                        _ => {
65                            return Err(ArrowError::CastError(
66                                "Run-end type must be i16, i32, or i64".to_string(),
67                            ));
68                        }
69                    };
70                    Ok(Arc::new(new_run_array))
71                }
72
73                // Expand to logical form
74                _ => {
75                    let run_ends = run_array.run_ends().values().to_vec();
76                    let mut indices = Vec::with_capacity(run_array.run_ends().len());
77                    let mut physical_idx: usize = 0;
78                    for logical_idx in 0..run_array.run_ends().len() {
79                        // If the logical index is equal to the (next) run end, increment the physical index,
80                        // since we are at the end of a run.
81                        if logical_idx == run_ends[physical_idx].as_usize() {
82                            physical_idx += 1;
83                        }
84                        indices.push(physical_idx as i32);
85                    }
86
87                    let taken = take(&values, &Int32Array::from_iter_values(indices), None)?;
88                    if taken.data_type() != to_type {
89                        cast_with_options(taken.as_ref(), to_type, cast_options)
90                    } else {
91                        Ok(taken)
92                    }
93                }
94            }
95        }
96
97        _ => Err(ArrowError::CastError(format!(
98            "Cannot cast array of type {:?} to RunEndEncodedArray",
99            array.data_type()
100        ))),
101    }
102}
103
104/// Attempts to encode an array into a `RunArray` with index type K
105/// and value type `value_type`
106pub(crate) fn cast_to_run_end_encoded<K: RunEndIndexType>(
107    array: &ArrayRef,
108    value_type: &DataType,
109    cast_options: &CastOptions,
110) -> Result<ArrayRef, ArrowError> {
111    let mut run_ends_builder = PrimitiveBuilder::<K>::new();
112
113    // Cast the input array to the target value type if necessary
114    let cast_array = if array.data_type() == value_type {
115        array
116    } else {
117        &cast_with_options(array, value_type, cast_options)?
118    };
119
120    // Return early if the array to cast is empty
121    if cast_array.is_empty() {
122        let empty_run_ends = run_ends_builder.finish();
123        let empty_values = make_array(ArrayData::new_empty(value_type));
124        return Ok(Arc::new(RunArray::<K>::try_new(
125            &empty_run_ends,
126            empty_values.as_ref(),
127        )?));
128    }
129
130    // REE arrays are handled by run_end_encoded_cast
131    if let DataType::RunEndEncoded(_, _) = array.data_type() {
132        return Err(ArrowError::CastError(
133            "Source array is already a RunEndEncoded array, should have been handled by run_end_encoded_cast".to_string()
134        ));
135    }
136
137    // Partition the array to identify runs of consecutive equal values
138    let partitions = partition(&[Arc::clone(cast_array)])?;
139    let mut run_ends = Vec::new();
140    let mut values_indexes = Vec::new();
141    let mut last_partition_end = 0;
142    for partition in partitions.ranges() {
143        values_indexes.push(last_partition_end);
144        run_ends.push(partition.end);
145        last_partition_end = partition.end;
146    }
147
148    // Build the run_ends array
149    for run_end in run_ends {
150        run_ends_builder.append_value(K::Native::from_usize(run_end).ok_or_else(|| {
151            ArrowError::CastError(format!("Run end index out of range: {}", run_end))
152        })?);
153    }
154    let run_ends_array = run_ends_builder.finish();
155    // Build the values array by taking elements at the run start positions
156    let indices = PrimitiveArray::<UInt32Type>::from_iter_values(
157        values_indexes.iter().map(|&idx| idx as u32),
158    );
159    let values_array = take(&cast_array, &indices, None)?;
160
161    // Create and return the RunArray
162    let run_array = RunArray::<K>::try_new(&run_ends_array, values_array.as_ref())?;
163    Ok(Arc::new(run_array))
164}