Skip to main content

arrow_ord/
partition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines partition kernel for `ArrayRef`
19
20use std::ops::Range;
21
22use arrow_array::{Array, ArrayRef};
23use arrow_buffer::BooleanBuffer;
24use arrow_schema::{ArrowError, SortOptions};
25
26use crate::cmp::{distinct, supports_distinct};
27use crate::ord::make_comparator;
28
29/// A computed set of partitions, see [`partition`]
30#[derive(Debug, Clone)]
31pub struct Partitions(Option<BooleanBuffer>);
32
33impl Partitions {
34    /// Returns the range of each partition
35    ///
36    /// Consecutive ranges will be contiguous: i.e [`(a, b)` and `(b, c)`], and
37    /// `start = 0` and `end = self.len()` for the first and last range respectively
38    pub fn ranges(&self) -> Vec<Range<usize>> {
39        let boundaries = match &self.0 {
40            Some(boundaries) => boundaries,
41            None => return vec![],
42        };
43
44        let mut out = vec![];
45        let mut current = 0;
46        for idx in boundaries.set_indices() {
47            let t = current;
48            current = idx + 1;
49            out.push(t..current)
50        }
51        let last = boundaries.len() + 1;
52        if current != last {
53            out.push(current..last)
54        }
55        out
56    }
57
58    /// Returns the number of partitions
59    pub fn len(&self) -> usize {
60        match &self.0 {
61            Some(b) => b.count_set_bits() + 1,
62            None => 0,
63        }
64    }
65
66    /// Returns true if this contains no partitions
67    pub fn is_empty(&self) -> bool {
68        self.0.is_none()
69    }
70}
71
72/// Given a list of lexicographically sorted columns, computes the [`Partitions`],
73/// where a partition consists of the set of consecutive rows with equal values
74///
75/// Returns an error if no columns are specified or all columns do not
76/// have the same number of rows.
77///
78/// # Example:
79///
80/// For example, given columns `x`, `y` and `z`, calling
81/// [`partition`]`(values, (x, y))` will divide the
82/// rows into ranges where the values of `(x, y)` are equal:
83///
84/// ```text
85/// ┌ ─ ┬───┬ ─ ─┌───┐─ ─ ┬───┬ ─ ─ ┐
86///     │ 1 │    │ 1 │    │ A │        Range: 0..1 (x=1, y=1)
87/// ├ ─ ┼───┼ ─ ─├───┤─ ─ ┼───┼ ─ ─ ┤
88///     │ 1 │    │ 2 │    │ B │
89/// │   ├───┤    ├───┤    ├───┤     │
90///     │ 1 │    │ 2 │    │ C │        Range: 1..4 (x=1, y=2)
91/// │   ├───┤    ├───┤    ├───┤     │
92///     │ 1 │    │ 2 │    │ D │
93/// ├ ─ ┼───┼ ─ ─├───┤─ ─ ┼───┼ ─ ─ ┤
94///     │ 2 │    │ 1 │    │ E │        Range: 4..5 (x=2, y=1)
95/// ├ ─ ┼───┼ ─ ─├───┤─ ─ ┼───┼ ─ ─ ┤
96///     │ 3 │    │ 1 │    │ F │        Range: 5..6 (x=3, y=1)
97/// └ ─ ┴───┴ ─ ─└───┘─ ─ ┴───┴ ─ ─ ┘
98///
99///       x        y        z     partition(&[x, y])
100/// ```
101///
102/// # Example Code
103///
104/// ```
105/// # use std::{sync::Arc, ops::Range};
106/// # use arrow_array::{RecordBatch, Int64Array, StringArray, ArrayRef};
107/// # use arrow_ord::sort::{SortColumn, SortOptions};
108/// # use arrow_ord::partition::partition;
109/// let batch = RecordBatch::try_from_iter(vec![
110///     ("x", Arc::new(Int64Array::from(vec![1, 1, 1, 1, 2, 3])) as ArrayRef),
111///     ("y", Arc::new(Int64Array::from(vec![1, 2, 2, 2, 1, 1])) as ArrayRef),
112///     ("z", Arc::new(StringArray::from(vec!["A", "B", "C", "D", "E", "F"])) as ArrayRef),
113/// ]).unwrap();
114///
115/// // Partition on first two columns
116/// let ranges = partition(&batch.columns()[..2]).unwrap().ranges();
117///
118/// let expected = vec![
119///     (0..1),
120///     (1..4),
121///     (4..5),
122///     (5..6),
123/// ];
124///
125/// assert_eq!(ranges, expected);
126/// ```
127pub fn partition(columns: &[ArrayRef]) -> Result<Partitions, ArrowError> {
128    if columns.is_empty() {
129        return Err(ArrowError::InvalidArgumentError(
130            "Partition requires at least one column".to_string(),
131        ));
132    }
133    let num_rows = columns[0].len();
134    if columns.iter().any(|item| item.len() != num_rows) {
135        return Err(ArrowError::InvalidArgumentError(
136            "Partition columns have different row counts".to_string(),
137        ));
138    };
139
140    match num_rows {
141        0 => return Ok(Partitions(None)),
142        1 => return Ok(Partitions(Some(BooleanBuffer::new_unset(0)))),
143        _ => {}
144    }
145
146    let acc = find_boundaries(&columns[0])?;
147    let acc = columns
148        .iter()
149        .skip(1)
150        .try_fold(acc, |acc, c| find_boundaries(c.as_ref()).map(|b| &acc | &b))?;
151
152    Ok(Partitions(Some(acc)))
153}
154
155/// Returns a mask with bits set whenever the value or nullability changes
156fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
157    let slice_len = v.len() - 1;
158    let v1 = v.slice(0, slice_len);
159    let v2 = v.slice(1, slice_len);
160
161    if supports_distinct(v.data_type()) {
162        return Ok(distinct(&v1, &v2)?.values().clone());
163    }
164    // Given that we're only comparing values, null ordering in the input or
165    // sort options do not matter.
166    let cmp = make_comparator(&v1, &v2, SortOptions::default())?;
167    Ok((0..slice_len).map(|i| !cmp(i, i).is_eq()).collect())
168}
169
170#[cfg(test)]
171mod tests {
172    use std::sync::Arc;
173
174    use super::*;
175    use arrow_array::*;
176    use arrow_schema::DataType;
177
178    #[test]
179    fn test_partition_empty() {
180        let err = partition(&[]).unwrap_err();
181        assert_eq!(
182            err.to_string(),
183            "Invalid argument error: Partition requires at least one column"
184        );
185    }
186
187    #[test]
188    fn test_partition_unaligned_rows() {
189        let input = vec![
190            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
191            Arc::new(StringArray::from(vec![Some("foo")])) as _,
192        ];
193        let err = partition(&input).unwrap_err();
194        assert_eq!(
195            err.to_string(),
196            "Invalid argument error: Partition columns have different row counts"
197        )
198    }
199
200    #[test]
201    fn test_partition_small() {
202        let results = partition(&[
203            Arc::new(Int32Array::new(vec![].into(), None)) as _,
204            Arc::new(Int32Array::new(vec![].into(), None)) as _,
205            Arc::new(Int32Array::new(vec![].into(), None)) as _,
206        ])
207        .unwrap();
208        assert_eq!(results.len(), 0);
209        assert!(results.is_empty());
210
211        let results = partition(&[
212            Arc::new(Int32Array::from(vec![1])) as _,
213            Arc::new(Int32Array::from(vec![1])) as _,
214        ])
215        .unwrap()
216        .ranges();
217        assert_eq!(results.len(), 1);
218        assert_eq!(results[0], 0..1);
219    }
220
221    #[test]
222    fn test_partition_single_column() {
223        let a = Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]);
224        let input = vec![Arc::new(a) as _];
225        assert_eq!(
226            partition(&input).unwrap().ranges(),
227            vec![(0..1), (1..8), (8..9)],
228        );
229    }
230
231    #[test]
232    fn test_partition_all_equal_values() {
233        let a = Int64Array::from_value(1, 1000);
234        let input = vec![Arc::new(a) as _];
235        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
236    }
237
238    #[test]
239    fn test_partition_all_null_values() {
240        let input = vec![
241            new_null_array(&DataType::Int8, 1000),
242            new_null_array(&DataType::UInt16, 1000),
243        ];
244        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
245    }
246
247    #[test]
248    fn test_partition_unique_column_1() {
249        let input = vec![
250            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
251            Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) as _,
252        ];
253        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1), (1..2)],);
254    }
255
256    #[test]
257    fn test_partition_unique_column_2() {
258        let input = vec![
259            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)])) as _,
260            Arc::new(StringArray::from(vec![
261                Some("foo"),
262                Some("bar"),
263                Some("apple"),
264            ])) as _,
265        ];
266        assert_eq!(
267            partition(&input).unwrap().ranges(),
268            vec![(0..1), (1..2), (2..3),],
269        );
270    }
271
272    #[test]
273    fn test_partition_non_unique_column_1() {
274        let input = vec![
275            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1), Some(1)])) as _,
276            Arc::new(StringArray::from(vec![
277                Some("foo"),
278                Some("bar"),
279                Some("bar"),
280                Some("bar"),
281            ])) as _,
282        ];
283        assert_eq!(
284            partition(&input).unwrap().ranges(),
285            vec![(0..1), (1..3), (3..4),],
286        );
287    }
288
289    #[test]
290    fn test_partition_masked_nulls() {
291        let input = vec![
292            Arc::new(Int64Array::new(vec![1; 9].into(), None)) as _,
293            Arc::new(Int64Array::new(
294                vec![1, 1, 2, 2, 2, 3, 3, 3, 3].into(),
295                Some(vec![false, true, true, true, true, false, false, true, false].into()),
296            )) as _,
297            Arc::new(Int64Array::new(
298                vec![1, 1, 2, 2, 2, 2, 2, 3, 7].into(),
299                Some(vec![true, true, true, true, false, true, true, true, false].into()),
300            )) as _,
301        ];
302
303        assert_eq!(
304            partition(&input).unwrap().ranges(),
305            vec![(0..1), (1..2), (2..4), (4..5), (5..7), (7..8), (8..9)],
306        );
307    }
308
309    #[test]
310    fn test_partition_run_end_encoded() {
311        let run_ends = Int32Array::from(vec![2, 3, 5]);
312        let values = StringArray::from(vec!["x", "y", "x"]);
313        let ree = RunArray::try_new(&run_ends, &values).unwrap();
314        // logical: ["x", "x", "y", "x", "x"]
315        let input = vec![Arc::new(ree) as _];
316        assert_eq!(partition(&input).unwrap().ranges(), vec![0..2, 2..3, 3..5],);
317    }
318
319    #[test]
320    fn test_partition_nested_run_end_encoded() {
321        // Inner REE (values of the outer): run_ends [1, 2, 3], values ["x", "y", "x"]
322        // logical length 3: ["x", "y", "x"]
323        let inner_run_ends = Int32Array::from(vec![1, 2, 3]);
324        let inner_values = StringArray::from(vec!["x", "y", "x"]);
325        let inner_ree = RunArray::try_new(&inner_run_ends, &inner_values).unwrap();
326
327        // Outer REE: run_ends [2, 3, 5], values = inner_ree (length 3)
328        // logical: rows 0,1 → inner[0]="x", row 2 → inner[1]="y", rows 3,4 → inner[2]="x"
329        // = ["x", "x", "y", "x", "x"]
330        let outer_run_ends = Int32Array::from(vec![2, 3, 5]);
331        let outer_ree = RunArray::try_new(&outer_run_ends, &inner_ree).unwrap();
332
333        let input = vec![Arc::new(outer_ree) as ArrayRef];
334        assert_eq!(partition(&input).unwrap().ranges(), vec![0..2, 2..3, 3..5]);
335    }
336
337    #[test]
338    fn test_partition_ree_with_dictionary_values() {
339        // Dictionary values: keys [0, 1, 0], dict ["x", "y"] → logical ["x", "y", "x"]
340        let dict_values = StringArray::from(vec!["x", "y"]);
341        let keys = Int32Array::from(vec![0, 1, 0]);
342        let dict = DictionaryArray::try_new(keys, Arc::new(dict_values)).unwrap();
343
344        // REE wrapping dict: run_ends [2, 3, 5] → logical [dict[0], dict[0], dict[1], dict[2], dict[2]]
345        // = ["x", "x", "y", "x", "x"]
346        let run_ends = Int32Array::from(vec![2, 3, 5]);
347        let ree = RunArray::try_new(&run_ends, &dict).unwrap();
348        let input = vec![Arc::new(ree) as ArrayRef];
349        assert_eq!(partition(&input).unwrap().ranges(), vec![0..2, 2..3, 3..5],);
350    }
351
352    #[test]
353    fn test_partition_dictionary() {
354        let values = StringArray::from(vec!["x", "y"]);
355        let keys = Int32Array::from(vec![0, 0, 1, 0, 0]);
356        let dict = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
357        // logical: ["x", "x", "y", "x", "x"]
358        let input = vec![Arc::new(dict) as _];
359        assert_eq!(partition(&input).unwrap().ranges(), vec![0..2, 2..3, 3..5],);
360    }
361
362    #[test]
363    fn test_partition_nested_dictionary() {
364        let inner_values = StringArray::from(vec!["x", "y"]);
365        let inner_keys = Int32Array::from(vec![0, 1, 0]);
366        let inner_dict = DictionaryArray::try_new(inner_keys, Arc::new(inner_values)).unwrap();
367
368        // Outer dict keys index into inner dict's logical values: ["x", "y", "x"]
369        // keys [0, 0, 1, 2, 2] → logical ["x", "x", "y", "x", "x"]
370        let outer_keys = Int32Array::from(vec![0, 0, 1, 2, 2]);
371        let outer_dict = DictionaryArray::try_new(outer_keys, Arc::new(inner_dict)).unwrap();
372        let input = vec![Arc::new(outer_dict) as ArrayRef];
373        assert_eq!(partition(&input).unwrap().ranges(), vec![0..2, 2..3, 3..5],);
374    }
375
376    #[test]
377    fn test_partition_dictionary_with_ree_values() {
378        // REE values: run_ends [2, 3], values ["x", "y"] → logical ["x", "x", "y"]
379        let run_ends = Int32Array::from(vec![2, 3]);
380        let str_values = StringArray::from(vec!["x", "y"]);
381        let ree = RunArray::try_new(&run_ends, &str_values).unwrap();
382
383        // Dictionary keys index into the REE's logical values
384        // keys [0, 0, 2, 0, 0] → logical ["x", "x", "y", "x", "x"]
385        let keys = Int32Array::from(vec![0, 0, 2, 0, 0]);
386        let dict = DictionaryArray::try_new(keys, Arc::new(ree)).unwrap();
387        let input = vec![Arc::new(dict) as ArrayRef];
388        assert_eq!(partition(&input).unwrap().ranges(), vec![0..2, 2..3, 3..5],);
389    }
390
391    #[test]
392    fn test_partition_nested() {
393        let input = vec![
394            Arc::new(
395                StructArray::try_from(vec![(
396                    "f1",
397                    Arc::new(Int64Array::from(vec![
398                        None,
399                        None,
400                        Some(1),
401                        Some(2),
402                        Some(2),
403                        Some(2),
404                        Some(3),
405                        Some(4),
406                    ])) as _,
407                )])
408                .unwrap(),
409            ) as _,
410            Arc::new(Int64Array::from(vec![1, 1, 1, 2, 3, 3, 3, 4])) as _,
411        ];
412        assert_eq!(
413            partition(&input).unwrap().ranges(),
414            vec![0..2, 2..3, 3..4, 4..6, 6..7, 7..8]
415        )
416    }
417}