Skip to main content

arrow_cast/cast/
run_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::cast::*;
19use arrow_ord::partition::partition;
20
21/// Attempts to cast a `RunArray` with index type K into
22/// `to_type` for supported types.
23pub(crate) fn run_end_encoded_cast<K: RunEndIndexType>(
24    array: &dyn Array,
25    to_type: &DataType,
26    cast_options: &CastOptions,
27) -> Result<ArrayRef, ArrowError> {
28    match array.data_type() {
29        DataType::RunEndEncoded(_, _) => {
30            let run_array = array
31                .as_any()
32                .downcast_ref::<RunArray<K>>()
33                .ok_or_else(|| ArrowError::CastError("Expected RunArray".to_string()))?;
34
35            match to_type {
36                // Stay as RunEndEncoded, cast only the values
37                DataType::RunEndEncoded(target_index_field, target_value_field) => {
38                    let values = run_array.values_slice();
39                    let cast_values = cast_with_options(
40                        values.as_ref(),
41                        target_value_field.data_type(),
42                        cast_options,
43                    )?;
44
45                    let run_ends_array =
46                        PrimitiveArray::<K>::from_iter_values(run_array.run_ends().sliced_values());
47                    let cast_run_ends = cast_with_options(
48                        &run_ends_array,
49                        target_index_field.data_type(),
50                        cast_options,
51                    )?;
52                    let new_run_array: ArrayRef = match target_index_field.data_type() {
53                        DataType::Int16 => {
54                            let re = cast_run_ends.as_primitive::<Int16Type>();
55                            Arc::new(RunArray::<Int16Type>::try_new(re, cast_values.as_ref())?)
56                        }
57                        DataType::Int32 => {
58                            let re = cast_run_ends.as_primitive::<Int32Type>();
59                            Arc::new(RunArray::<Int32Type>::try_new(re, cast_values.as_ref())?)
60                        }
61                        DataType::Int64 => {
62                            let re = cast_run_ends.as_primitive::<Int64Type>();
63                            Arc::new(RunArray::<Int64Type>::try_new(re, cast_values.as_ref())?)
64                        }
65                        _ => {
66                            return Err(ArrowError::CastError(
67                                "Run-end type must be i16, i32, or i64".to_string(),
68                            ));
69                        }
70                    };
71                    Ok(new_run_array)
72                }
73
74                // Expand to logical form
75                _ => {
76                    let values = run_array.values();
77                    let len = run_array.len();
78                    let offset = run_array.offset();
79                    let run_ends = run_array.run_ends().values();
80
81                    let mut indices = Vec::with_capacity(len);
82                    let mut physical_idx = run_array.get_start_physical_index();
83
84                    for logical_idx in offset..offset + len {
85                        if logical_idx == run_ends[physical_idx].as_usize() {
86                            // If the logical index is equal to the (next) run end, increment the physical index,
87                            // since we are at the end of a run.
88                            physical_idx += 1;
89                        }
90                        indices.push(physical_idx as i32);
91                    }
92
93                    let taken = take(&values, &Int32Array::from_iter_values(indices), None)?;
94                    if taken.data_type() != to_type {
95                        cast_with_options(taken.as_ref(), to_type, cast_options)
96                    } else {
97                        Ok(taken)
98                    }
99                }
100            }
101        }
102
103        _ => Err(ArrowError::CastError(format!(
104            "Cannot cast array of type {:?} to RunEndEncodedArray",
105            array.data_type()
106        ))),
107    }
108}
109
110/// Attempts to encode an array into a `RunArray` with index type K
111/// and value type `value_type`
112pub(crate) fn cast_to_run_end_encoded<K: RunEndIndexType>(
113    array: &ArrayRef,
114    value_type: &DataType,
115    cast_options: &CastOptions,
116) -> Result<ArrayRef, ArrowError> {
117    let mut run_ends_builder = PrimitiveBuilder::<K>::new();
118
119    // Cast the input array to the target value type if necessary
120    let cast_array = if array.data_type() == value_type {
121        array
122    } else {
123        &cast_with_options(array, value_type, cast_options)?
124    };
125
126    // Return early if the array to cast is empty
127    if cast_array.is_empty() {
128        let empty_run_ends = run_ends_builder.finish();
129        let empty_values = make_array(ArrayData::new_empty(value_type));
130        return Ok(Arc::new(RunArray::<K>::try_new(
131            &empty_run_ends,
132            empty_values.as_ref(),
133        )?));
134    }
135
136    // REE arrays are handled by run_end_encoded_cast
137    if let DataType::RunEndEncoded(_, _) = array.data_type() {
138        return Err(ArrowError::CastError(
139            "Source array is already a RunEndEncoded array, should have been handled by run_end_encoded_cast".to_string()
140        ));
141    }
142
143    // Partition the array to identify runs of consecutive equal values
144    let partitions = partition(&[Arc::clone(cast_array)])?;
145    let size = partitions.len();
146    let mut run_ends = Vec::with_capacity(size);
147    let mut values_indexes = Vec::with_capacity(size);
148    let mut last_partition_end = 0;
149    for partition in partitions.ranges() {
150        values_indexes.push(last_partition_end);
151        run_ends.push(partition.end);
152        last_partition_end = partition.end;
153    }
154
155    // Build the run_ends array
156    for run_end in run_ends {
157        run_ends_builder.append_value(K::Native::from_usize(run_end).ok_or_else(|| {
158            ArrowError::CastError(format!("Run end index out of range: {}", run_end))
159        })?);
160    }
161    let run_ends_array = run_ends_builder.finish();
162    // Build the values array by taking elements at the run start positions
163    let indices = PrimitiveArray::<UInt32Type>::from_iter_values(
164        values_indexes.iter().map(|&idx| idx as u32),
165    );
166    let values_array = take(&cast_array, &indices, None)?;
167
168    // Create and return the RunArray
169    let run_array = RunArray::<K>::try_new(&run_ends_array, values_array.as_ref())?;
170    Ok(Arc::new(run_array))
171}