1use std::ops::Range;
21
22use arrow_array::{Array, ArrayRef};
23use arrow_buffer::BooleanBuffer;
24use arrow_schema::{ArrowError, SortOptions};
25
26use crate::cmp::distinct;
27use crate::ord::make_comparator;
28
29#[derive(Debug, Clone)]
31pub struct Partitions(Option<BooleanBuffer>);
32
33impl Partitions {
34 pub fn ranges(&self) -> Vec<Range<usize>> {
39 let boundaries = match &self.0 {
40 Some(boundaries) => boundaries,
41 None => return vec![],
42 };
43
44 let mut out = vec![];
45 let mut current = 0;
46 for idx in boundaries.set_indices() {
47 let t = current;
48 current = idx + 1;
49 out.push(t..current)
50 }
51 let last = boundaries.len() + 1;
52 if current != last {
53 out.push(current..last)
54 }
55 out
56 }
57
58 pub fn len(&self) -> usize {
60 match &self.0 {
61 Some(b) => b.count_set_bits() + 1,
62 None => 0,
63 }
64 }
65
66 pub fn is_empty(&self) -> bool {
68 self.0.is_none()
69 }
70}
71
72pub fn partition(columns: &[ArrayRef]) -> Result<Partitions, ArrowError> {
128 if columns.is_empty() {
129 return Err(ArrowError::InvalidArgumentError(
130 "Partition requires at least one column".to_string(),
131 ));
132 }
133 let num_rows = columns[0].len();
134 if columns.iter().any(|item| item.len() != num_rows) {
135 return Err(ArrowError::InvalidArgumentError(
136 "Partition columns have different row counts".to_string(),
137 ));
138 };
139
140 match num_rows {
141 0 => return Ok(Partitions(None)),
142 1 => return Ok(Partitions(Some(BooleanBuffer::new_unset(0)))),
143 _ => {}
144 }
145
146 let acc = find_boundaries(&columns[0])?;
147 let acc = columns
148 .iter()
149 .skip(1)
150 .try_fold(acc, |acc, c| find_boundaries(c.as_ref()).map(|b| &acc | &b))?;
151
152 Ok(Partitions(Some(acc)))
153}
154
155fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
157 let slice_len = v.len() - 1;
158 let v1 = v.slice(0, slice_len);
159 let v2 = v.slice(1, slice_len);
160
161 if !v.data_type().is_nested() {
162 return Ok(distinct(&v1, &v2)?.values().clone());
163 }
164 let cmp = make_comparator(&v1, &v2, SortOptions::default())?;
167 Ok((0..slice_len).map(|i| !cmp(i, i).is_eq()).collect())
168}
169
170#[cfg(test)]
171mod tests {
172 use std::sync::Arc;
173
174 use super::*;
175 use arrow_array::*;
176 use arrow_schema::DataType;
177
178 #[test]
179 fn test_partition_empty() {
180 let err = partition(&[]).unwrap_err();
181 assert_eq!(
182 err.to_string(),
183 "Invalid argument error: Partition requires at least one column"
184 );
185 }
186
187 #[test]
188 fn test_partition_unaligned_rows() {
189 let input = vec![
190 Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
191 Arc::new(StringArray::from(vec![Some("foo")])) as _,
192 ];
193 let err = partition(&input).unwrap_err();
194 assert_eq!(
195 err.to_string(),
196 "Invalid argument error: Partition columns have different row counts"
197 )
198 }
199
200 #[test]
201 fn test_partition_small() {
202 let results = partition(&[
203 Arc::new(Int32Array::new(vec![].into(), None)) as _,
204 Arc::new(Int32Array::new(vec![].into(), None)) as _,
205 Arc::new(Int32Array::new(vec![].into(), None)) as _,
206 ])
207 .unwrap();
208 assert_eq!(results.len(), 0);
209 assert!(results.is_empty());
210
211 let results = partition(&[
212 Arc::new(Int32Array::from(vec![1])) as _,
213 Arc::new(Int32Array::from(vec![1])) as _,
214 ])
215 .unwrap()
216 .ranges();
217 assert_eq!(results.len(), 1);
218 assert_eq!(results[0], 0..1);
219 }
220
221 #[test]
222 fn test_partition_single_column() {
223 let a = Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]);
224 let input = vec![Arc::new(a) as _];
225 assert_eq!(
226 partition(&input).unwrap().ranges(),
227 vec![(0..1), (1..8), (8..9)],
228 );
229 }
230
231 #[test]
232 fn test_partition_all_equal_values() {
233 let a = Int64Array::from_value(1, 1000);
234 let input = vec![Arc::new(a) as _];
235 assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
236 }
237
238 #[test]
239 fn test_partition_all_null_values() {
240 let input = vec![
241 new_null_array(&DataType::Int8, 1000),
242 new_null_array(&DataType::UInt16, 1000),
243 ];
244 assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
245 }
246
247 #[test]
248 fn test_partition_unique_column_1() {
249 let input = vec![
250 Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
251 Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) as _,
252 ];
253 assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1), (1..2)],);
254 }
255
256 #[test]
257 fn test_partition_unique_column_2() {
258 let input = vec![
259 Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)])) as _,
260 Arc::new(StringArray::from(vec![
261 Some("foo"),
262 Some("bar"),
263 Some("apple"),
264 ])) as _,
265 ];
266 assert_eq!(
267 partition(&input).unwrap().ranges(),
268 vec![(0..1), (1..2), (2..3),],
269 );
270 }
271
272 #[test]
273 fn test_partition_non_unique_column_1() {
274 let input = vec![
275 Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1), Some(1)])) as _,
276 Arc::new(StringArray::from(vec![
277 Some("foo"),
278 Some("bar"),
279 Some("bar"),
280 Some("bar"),
281 ])) as _,
282 ];
283 assert_eq!(
284 partition(&input).unwrap().ranges(),
285 vec![(0..1), (1..3), (3..4),],
286 );
287 }
288
289 #[test]
290 fn test_partition_masked_nulls() {
291 let input = vec![
292 Arc::new(Int64Array::new(vec![1; 9].into(), None)) as _,
293 Arc::new(Int64Array::new(
294 vec![1, 1, 2, 2, 2, 3, 3, 3, 3].into(),
295 Some(vec![false, true, true, true, true, false, false, true, false].into()),
296 )) as _,
297 Arc::new(Int64Array::new(
298 vec![1, 1, 2, 2, 2, 2, 2, 3, 7].into(),
299 Some(vec![true, true, true, true, false, true, true, true, false].into()),
300 )) as _,
301 ];
302
303 assert_eq!(
304 partition(&input).unwrap().ranges(),
305 vec![(0..1), (1..2), (2..4), (4..5), (5..7), (7..8), (8..9)],
306 );
307 }
308
309 #[test]
310 fn test_partition_nested() {
311 let input = vec![
312 Arc::new(
313 StructArray::try_from(vec![(
314 "f1",
315 Arc::new(Int64Array::from(vec![
316 None,
317 None,
318 Some(1),
319 Some(2),
320 Some(2),
321 Some(2),
322 Some(3),
323 Some(4),
324 ])) as _,
325 )])
326 .unwrap(),
327 ) as _,
328 Arc::new(Int64Array::from(vec![1, 1, 1, 2, 3, 3, 3, 4])) as _,
329 ];
330 assert_eq!(
331 partition(&input).unwrap().ranges(),
332 vec![0..2, 2..3, 3..4, 4..6, 6..7, 7..8]
333 )
334 }
335}