parquet/arrow/arrow_reader/
selection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_array::{Array, BooleanArray};
19use arrow_select::filter::SlicesIterator;
20use std::cmp::Ordering;
21use std::collections::VecDeque;
22use std::ops::Range;
23
24/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
25/// scanning a parquet file
26#[derive(Debug, Clone, Copy, Eq, PartialEq)]
27pub struct RowSelector {
28    /// The number of rows
29    pub row_count: usize,
30
31    /// If true, skip `row_count` rows
32    pub skip: bool,
33}
34
35impl RowSelector {
36    /// Select `row_count` rows
37    pub fn select(row_count: usize) -> Self {
38        Self {
39            row_count,
40            skip: false,
41        }
42    }
43
44    /// Skip `row_count` rows
45    pub fn skip(row_count: usize) -> Self {
46        Self {
47            row_count,
48            skip: true,
49        }
50    }
51}
52
53/// [`RowSelection`] allows selecting or skipping a provided number of rows
54/// when scanning the parquet file.
55///
56/// This is applied prior to reading column data, and can therefore
57/// be used to skip IO to fetch data into memory
58///
59/// A typical use-case would be using the [`PageIndex`] to filter out rows
60/// that don't satisfy a predicate
61///
62/// # Example
63/// ```
64/// use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
65///
66/// let selectors = vec![
67///     RowSelector::skip(5),
68///     RowSelector::select(5),
69///     RowSelector::select(5),
70///     RowSelector::skip(5),
71/// ];
72///
73/// // Creating a selection will combine adjacent selectors
74/// let selection: RowSelection = selectors.into();
75///
76/// let expected = vec![
77///     RowSelector::skip(5),
78///     RowSelector::select(10),
79///     RowSelector::skip(5),
80/// ];
81///
82/// let actual: Vec<RowSelector> = selection.into();
83/// assert_eq!(actual, expected);
84///
85/// // you can also create a selection from consecutive ranges
86/// let ranges = vec![5..10, 10..15];
87/// let selection =
88///   RowSelection::from_consecutive_ranges(ranges.into_iter(), 20);
89/// let actual: Vec<RowSelector> = selection.into();
90/// assert_eq!(actual, expected);
91/// ```
92///
93/// A [`RowSelection`] maintains the following invariants:
94///
95/// * It contains no [`RowSelector`] of 0 rows
96/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows
97///
98/// [`PageIndex`]: crate::file::page_index::index::PageIndex
99#[derive(Debug, Clone, Default, Eq, PartialEq)]
100pub struct RowSelection {
101    selectors: Vec<RowSelector>,
102}
103
104impl RowSelection {
105    /// Creates a [`RowSelection`] from a slice of [`BooleanArray`]
106    ///
107    /// # Panic
108    ///
109    /// Panics if any of the [`BooleanArray`] contain nulls
110    pub fn from_filters(filters: &[BooleanArray]) -> Self {
111        let mut next_offset = 0;
112        let total_rows = filters.iter().map(|x| x.len()).sum();
113
114        let iter = filters.iter().flat_map(|filter| {
115            let offset = next_offset;
116            next_offset += filter.len();
117            assert_eq!(filter.null_count(), 0);
118            SlicesIterator::new(filter).map(move |(start, end)| start + offset..end + offset)
119        });
120
121        Self::from_consecutive_ranges(iter, total_rows)
122    }
123
124    /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
125    pub fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
126        ranges: I,
127        total_rows: usize,
128    ) -> Self {
129        let mut selectors: Vec<RowSelector> = Vec::with_capacity(ranges.size_hint().0);
130        let mut last_end = 0;
131        for range in ranges {
132            let len = range.end - range.start;
133            if len == 0 {
134                continue;
135            }
136
137            match range.start.cmp(&last_end) {
138                Ordering::Equal => match selectors.last_mut() {
139                    Some(last) => last.row_count = last.row_count.checked_add(len).unwrap(),
140                    None => selectors.push(RowSelector::select(len)),
141                },
142                Ordering::Greater => {
143                    selectors.push(RowSelector::skip(range.start - last_end));
144                    selectors.push(RowSelector::select(len))
145                }
146                Ordering::Less => panic!("out of order"),
147            }
148            last_end = range.end;
149        }
150
151        if last_end != total_rows {
152            selectors.push(RowSelector::skip(total_rows - last_end))
153        }
154
155        Self { selectors }
156    }
157
158    /// Given an offset index, return the byte ranges for all data pages selected by `self`
159    ///
160    /// This is useful for determining what byte ranges to fetch from underlying storage
161    ///
162    /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce
163    /// ranges that are close together. This is instead delegated to the IO subsystem to optimise,
164    /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
165    pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec<Range<u64>> {
166        let mut ranges: Vec<Range<u64>> = vec![];
167        let mut row_offset = 0;
168
169        let mut pages = page_locations.iter().peekable();
170        let mut selectors = self.selectors.iter().cloned();
171        let mut current_selector = selectors.next();
172        let mut current_page = pages.next();
173
174        let mut current_page_included = false;
175
176        while let Some((selector, page)) = current_selector.as_mut().zip(current_page) {
177            if !(selector.skip || current_page_included) {
178                let start = page.offset as u64;
179                let end = start + page.compressed_page_size as u64;
180                ranges.push(start..end);
181                current_page_included = true;
182            }
183
184            if let Some(next_page) = pages.peek() {
185                if row_offset + selector.row_count > next_page.first_row_index as usize {
186                    let remaining_in_page = next_page.first_row_index as usize - row_offset;
187                    selector.row_count -= remaining_in_page;
188                    row_offset += remaining_in_page;
189                    current_page = pages.next();
190                    current_page_included = false;
191
192                    continue;
193                } else {
194                    if row_offset + selector.row_count == next_page.first_row_index as usize {
195                        current_page = pages.next();
196                        current_page_included = false;
197                    }
198                    row_offset += selector.row_count;
199                    current_selector = selectors.next();
200                }
201            } else {
202                if !(selector.skip || current_page_included) {
203                    let start = page.offset as u64;
204                    let end = start + page.compressed_page_size as u64;
205                    ranges.push(start..end);
206                }
207                current_selector = selectors.next()
208            }
209        }
210
211        ranges
212    }
213
214    /// Splits off the first `row_count` from this [`RowSelection`]
215    pub fn split_off(&mut self, row_count: usize) -> Self {
216        let mut total_count = 0;
217
218        // Find the index where the selector exceeds the row count
219        let find = self.selectors.iter().position(|selector| {
220            total_count += selector.row_count;
221            total_count > row_count
222        });
223
224        let split_idx = match find {
225            Some(idx) => idx,
226            None => {
227                let selectors = std::mem::take(&mut self.selectors);
228                return Self { selectors };
229            }
230        };
231
232        let mut remaining = self.selectors.split_off(split_idx);
233
234        // Always present as `split_idx < self.selectors.len`
235        let next = remaining.first_mut().unwrap();
236        let overflow = total_count - row_count;
237
238        if next.row_count != overflow {
239            self.selectors.push(RowSelector {
240                row_count: next.row_count - overflow,
241                skip: next.skip,
242            })
243        }
244        next.row_count = overflow;
245
246        std::mem::swap(&mut remaining, &mut self.selectors);
247        Self {
248            selectors: remaining,
249        }
250    }
251    /// returns a [`RowSelection`] representing rows that are selected in both
252    /// input [`RowSelection`]s.
253    ///
254    /// This is equivalent to the logical `AND` / conjunction of the two
255    /// selections.
256    ///
257    /// # Example
258    /// If `N` means the row is not selected, and `Y` means it is
259    /// selected:
260    ///
261    /// ```text
262    /// self:     NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
263    /// other:                YYYYYNNNNYYYYYYYYYYYYY   YYNNN
264    ///
265    /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN
266    /// ```
267    ///
268    /// # Panics
269    ///
270    /// Panics if `other` does not have a length equal to the number of rows selected
271    /// by this RowSelection
272    ///
273    pub fn and_then(&self, other: &Self) -> Self {
274        let mut selectors = vec![];
275        let mut first = self.selectors.iter().cloned().peekable();
276        let mut second = other.selectors.iter().cloned().peekable();
277
278        let mut to_skip = 0;
279        while let Some(b) = second.peek_mut() {
280            let a = first
281                .peek_mut()
282                .expect("selection exceeds the number of selected rows");
283
284            if b.row_count == 0 {
285                second.next().unwrap();
286                continue;
287            }
288
289            if a.row_count == 0 {
290                first.next().unwrap();
291                continue;
292            }
293
294            if a.skip {
295                // Records were skipped when producing second
296                to_skip += a.row_count;
297                first.next().unwrap();
298                continue;
299            }
300
301            let skip = b.skip;
302            let to_process = a.row_count.min(b.row_count);
303
304            a.row_count -= to_process;
305            b.row_count -= to_process;
306
307            match skip {
308                true => to_skip += to_process,
309                false => {
310                    if to_skip != 0 {
311                        selectors.push(RowSelector::skip(to_skip));
312                        to_skip = 0;
313                    }
314                    selectors.push(RowSelector::select(to_process))
315                }
316            }
317        }
318
319        for v in first {
320            if v.row_count != 0 {
321                assert!(
322                    v.skip,
323                    "selection contains less than the number of selected rows"
324                );
325                to_skip += v.row_count
326            }
327        }
328
329        if to_skip != 0 {
330            selectors.push(RowSelector::skip(to_skip));
331        }
332
333        Self { selectors }
334    }
335
336    /// Compute the intersection of two [`RowSelection`]
337    /// For example:
338    /// self:      NNYYYYNNYYNYN
339    /// other:     NYNNNNNNY
340    ///
341    /// returned:  NNNNNNNNYYNYN
342    pub fn intersection(&self, other: &Self) -> Self {
343        intersect_row_selections(&self.selectors, &other.selectors)
344    }
345
346    /// Compute the union of two [`RowSelection`]
347    /// For example:
348    /// self:      NNYYYYNNYYNYN
349    /// other:     NYNNNNNNN
350    ///
351    /// returned:  NYYYYYNNYYNYN
352    pub fn union(&self, other: &Self) -> Self {
353        union_row_selections(&self.selectors, &other.selectors)
354    }
355
356    /// Returns `true` if this [`RowSelection`] selects any rows
357    pub fn selects_any(&self) -> bool {
358        self.selectors.iter().any(|x| !x.skip)
359    }
360
361    /// Trims this [`RowSelection`] removing any trailing skips
362    pub(crate) fn trim(mut self) -> Self {
363        while self.selectors.last().map(|x| x.skip).unwrap_or(false) {
364            self.selectors.pop();
365        }
366        self
367    }
368
369    /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows
370    pub(crate) fn offset(mut self, offset: usize) -> Self {
371        if offset == 0 {
372            return self;
373        }
374
375        let mut selected_count = 0;
376        let mut skipped_count = 0;
377
378        // Find the index where the selector exceeds the row count
379        let find = self
380            .selectors
381            .iter()
382            .position(|selector| match selector.skip {
383                true => {
384                    skipped_count += selector.row_count;
385                    false
386                }
387                false => {
388                    selected_count += selector.row_count;
389                    selected_count > offset
390                }
391            });
392
393        let split_idx = match find {
394            Some(idx) => idx,
395            None => {
396                self.selectors.clear();
397                return self;
398            }
399        };
400
401        let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1);
402        selectors.push(RowSelector::skip(skipped_count + offset));
403        selectors.push(RowSelector::select(selected_count - offset));
404        selectors.extend_from_slice(&self.selectors[split_idx + 1..]);
405
406        Self { selectors }
407    }
408
409    /// Limit this [`RowSelection`] to only select `limit` rows
410    pub(crate) fn limit(mut self, mut limit: usize) -> Self {
411        if limit == 0 {
412            self.selectors.clear();
413        }
414
415        for (idx, selection) in self.selectors.iter_mut().enumerate() {
416            if !selection.skip {
417                if selection.row_count >= limit {
418                    selection.row_count = limit;
419                    self.selectors.truncate(idx + 1);
420                    break;
421                } else {
422                    limit -= selection.row_count;
423                }
424            }
425        }
426        self
427    }
428
429    /// Returns an iterator over the [`RowSelector`]s for this
430    /// [`RowSelection`].
431    pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
432        self.selectors.iter()
433    }
434
435    /// Returns the number of selected rows
436    pub fn row_count(&self) -> usize {
437        self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum()
438    }
439
440    /// Returns the number of de-selected rows
441    pub fn skipped_row_count(&self) -> usize {
442        self.iter().filter(|s| s.skip).map(|s| s.row_count).sum()
443    }
444
445    /// Expands the selection to align with batch boundaries.
446    /// This is needed when using cached array readers to ensure that
447    /// the cached data covers full batches.
448    #[cfg(feature = "async")]
449    pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self {
450        if batch_size == 0 {
451            return self.clone();
452        }
453
454        let mut expanded_ranges = Vec::new();
455        let mut row_offset = 0;
456
457        for selector in &self.selectors {
458            if selector.skip {
459                row_offset += selector.row_count;
460            } else {
461                let start = row_offset;
462                let end = row_offset + selector.row_count;
463
464                // Expand start to batch boundary
465                let expanded_start = (start / batch_size) * batch_size;
466                // Expand end to batch boundary
467                let expanded_end = end.div_ceil(batch_size) * batch_size;
468                let expanded_end = expanded_end.min(total_rows);
469
470                expanded_ranges.push(expanded_start..expanded_end);
471                row_offset += selector.row_count;
472            }
473        }
474
475        // Sort ranges by start position
476        expanded_ranges.sort_by_key(|range| range.start);
477
478        // Merge overlapping or consecutive ranges
479        let mut merged_ranges: Vec<Range<usize>> = Vec::new();
480        for range in expanded_ranges {
481            if let Some(last) = merged_ranges.last_mut() {
482                if range.start <= last.end {
483                    // Overlapping or consecutive - merge them
484                    last.end = last.end.max(range.end);
485                } else {
486                    // No overlap - add new range
487                    merged_ranges.push(range);
488                }
489            } else {
490                // First range
491                merged_ranges.push(range);
492            }
493        }
494
495        Self::from_consecutive_ranges(merged_ranges.into_iter(), total_rows)
496    }
497}
498
499impl From<Vec<RowSelector>> for RowSelection {
500    fn from(selectors: Vec<RowSelector>) -> Self {
501        selectors.into_iter().collect()
502    }
503}
504
505impl FromIterator<RowSelector> for RowSelection {
506    fn from_iter<T: IntoIterator<Item = RowSelector>>(iter: T) -> Self {
507        let iter = iter.into_iter();
508
509        // Capacity before filter
510        let mut selectors = Vec::with_capacity(iter.size_hint().0);
511
512        let mut filtered = iter.filter(|x| x.row_count != 0);
513        if let Some(x) = filtered.next() {
514            selectors.push(x);
515        }
516
517        for s in filtered {
518            if s.row_count == 0 {
519                continue;
520            }
521
522            // Combine consecutive selectors
523            let last = selectors.last_mut().unwrap();
524            if last.skip == s.skip {
525                last.row_count = last.row_count.checked_add(s.row_count).unwrap();
526            } else {
527                selectors.push(s)
528            }
529        }
530
531        Self { selectors }
532    }
533}
534
535impl From<RowSelection> for Vec<RowSelector> {
536    fn from(r: RowSelection) -> Self {
537        r.selectors
538    }
539}
540
541impl From<RowSelection> for VecDeque<RowSelector> {
542    fn from(r: RowSelection) -> Self {
543        r.selectors.into()
544    }
545}
546
547/// Combine two lists of `RowSelection` return the intersection of them
548/// For example:
549/// self:      NNYYYYNNYYNYN
550/// other:     NYNNNNNNY
551///
552/// returned:  NNNNNNNNYYNYN
553fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
554    let mut l_iter = left.iter().copied().peekable();
555    let mut r_iter = right.iter().copied().peekable();
556
557    let iter = std::iter::from_fn(move || {
558        loop {
559            let l = l_iter.peek_mut();
560            let r = r_iter.peek_mut();
561
562            match (l, r) {
563                (Some(a), _) if a.row_count == 0 => {
564                    l_iter.next().unwrap();
565                }
566                (_, Some(b)) if b.row_count == 0 => {
567                    r_iter.next().unwrap();
568                }
569                (Some(l), Some(r)) => {
570                    return match (l.skip, r.skip) {
571                        // Keep both ranges
572                        (false, false) => {
573                            if l.row_count < r.row_count {
574                                r.row_count -= l.row_count;
575                                l_iter.next()
576                            } else {
577                                l.row_count -= r.row_count;
578                                r_iter.next()
579                            }
580                        }
581                        // skip at least one
582                        _ => {
583                            if l.row_count < r.row_count {
584                                let skip = l.row_count;
585                                r.row_count -= l.row_count;
586                                l_iter.next();
587                                Some(RowSelector::skip(skip))
588                            } else {
589                                let skip = r.row_count;
590                                l.row_count -= skip;
591                                r_iter.next();
592                                Some(RowSelector::skip(skip))
593                            }
594                        }
595                    };
596                }
597                (Some(_), None) => return l_iter.next(),
598                (None, Some(_)) => return r_iter.next(),
599                (None, None) => return None,
600            }
601        }
602    });
603
604    iter.collect()
605}
606
607/// Combine two lists of `RowSelector` return the union of them
608/// For example:
609/// self:      NNYYYYNNYYNYN
610/// other:     NYNNNNNNY
611///
612/// returned:  NYYYYYNNYYNYN
613///
614/// This can be removed from here once RowSelection::union is in parquet::arrow
615fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
616    let mut l_iter = left.iter().copied().peekable();
617    let mut r_iter = right.iter().copied().peekable();
618
619    let iter = std::iter::from_fn(move || {
620        loop {
621            let l = l_iter.peek_mut();
622            let r = r_iter.peek_mut();
623
624            match (l, r) {
625                (Some(a), _) if a.row_count == 0 => {
626                    l_iter.next().unwrap();
627                }
628                (_, Some(b)) if b.row_count == 0 => {
629                    r_iter.next().unwrap();
630                }
631                (Some(l), Some(r)) => {
632                    return match (l.skip, r.skip) {
633                        // Skip both ranges
634                        (true, true) => {
635                            if l.row_count < r.row_count {
636                                let skip = l.row_count;
637                                r.row_count -= l.row_count;
638                                l_iter.next();
639                                Some(RowSelector::skip(skip))
640                            } else {
641                                let skip = r.row_count;
642                                l.row_count -= skip;
643                                r_iter.next();
644                                Some(RowSelector::skip(skip))
645                            }
646                        }
647                        // Keep rows from left
648                        (false, true) => {
649                            if l.row_count < r.row_count {
650                                r.row_count -= l.row_count;
651                                l_iter.next()
652                            } else {
653                                let r_row_count = r.row_count;
654                                l.row_count -= r_row_count;
655                                r_iter.next();
656                                Some(RowSelector::select(r_row_count))
657                            }
658                        }
659                        // Keep rows from right
660                        (true, false) => {
661                            if l.row_count < r.row_count {
662                                let l_row_count = l.row_count;
663                                r.row_count -= l_row_count;
664                                l_iter.next();
665                                Some(RowSelector::select(l_row_count))
666                            } else {
667                                l.row_count -= r.row_count;
668                                r_iter.next()
669                            }
670                        }
671                        // Keep at least one
672                        _ => {
673                            if l.row_count < r.row_count {
674                                r.row_count -= l.row_count;
675                                l_iter.next()
676                            } else {
677                                l.row_count -= r.row_count;
678                                r_iter.next()
679                            }
680                        }
681                    };
682                }
683                (Some(_), None) => return l_iter.next(),
684                (None, Some(_)) => return r_iter.next(),
685                (None, None) => return None,
686            }
687        }
688    });
689
690    iter.collect()
691}
692
693#[cfg(test)]
694mod tests {
695    use super::*;
696    use crate::format::PageLocation;
697    use rand::{rng, Rng};
698
699    #[test]
700    fn test_from_filters() {
701        let filters = vec![
702            BooleanArray::from(vec![false, false, false, true, true, true, true]),
703            BooleanArray::from(vec![true, true, false, false, true, true, true]),
704            BooleanArray::from(vec![false, false, false, false]),
705            BooleanArray::from(Vec::<bool>::new()),
706        ];
707
708        let selection = RowSelection::from_filters(&filters[..1]);
709        assert!(selection.selects_any());
710        assert_eq!(
711            selection.selectors,
712            vec![RowSelector::skip(3), RowSelector::select(4)]
713        );
714
715        let selection = RowSelection::from_filters(&filters[..2]);
716        assert!(selection.selects_any());
717        assert_eq!(
718            selection.selectors,
719            vec![
720                RowSelector::skip(3),
721                RowSelector::select(6),
722                RowSelector::skip(2),
723                RowSelector::select(3)
724            ]
725        );
726
727        let selection = RowSelection::from_filters(&filters);
728        assert!(selection.selects_any());
729        assert_eq!(
730            selection.selectors,
731            vec![
732                RowSelector::skip(3),
733                RowSelector::select(6),
734                RowSelector::skip(2),
735                RowSelector::select(3),
736                RowSelector::skip(4)
737            ]
738        );
739
740        let selection = RowSelection::from_filters(&filters[2..3]);
741        assert!(!selection.selects_any());
742        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
743    }
744
745    #[test]
746    fn test_split_off() {
747        let mut selection = RowSelection::from(vec![
748            RowSelector::skip(34),
749            RowSelector::select(12),
750            RowSelector::skip(3),
751            RowSelector::select(35),
752        ]);
753
754        let split = selection.split_off(34);
755        assert_eq!(split.selectors, vec![RowSelector::skip(34)]);
756        assert_eq!(
757            selection.selectors,
758            vec![
759                RowSelector::select(12),
760                RowSelector::skip(3),
761                RowSelector::select(35)
762            ]
763        );
764
765        let split = selection.split_off(5);
766        assert_eq!(split.selectors, vec![RowSelector::select(5)]);
767        assert_eq!(
768            selection.selectors,
769            vec![
770                RowSelector::select(7),
771                RowSelector::skip(3),
772                RowSelector::select(35)
773            ]
774        );
775
776        let split = selection.split_off(8);
777        assert_eq!(
778            split.selectors,
779            vec![RowSelector::select(7), RowSelector::skip(1)]
780        );
781        assert_eq!(
782            selection.selectors,
783            vec![RowSelector::skip(2), RowSelector::select(35)]
784        );
785
786        let split = selection.split_off(200);
787        assert_eq!(
788            split.selectors,
789            vec![RowSelector::skip(2), RowSelector::select(35)]
790        );
791        assert!(selection.selectors.is_empty());
792    }
793
794    #[test]
795    fn test_offset() {
796        let selection = RowSelection::from(vec![
797            RowSelector::select(5),
798            RowSelector::skip(23),
799            RowSelector::select(7),
800            RowSelector::skip(33),
801            RowSelector::select(6),
802        ]);
803
804        let selection = selection.offset(2);
805        assert_eq!(
806            selection.selectors,
807            vec![
808                RowSelector::skip(2),
809                RowSelector::select(3),
810                RowSelector::skip(23),
811                RowSelector::select(7),
812                RowSelector::skip(33),
813                RowSelector::select(6),
814            ]
815        );
816
817        let selection = selection.offset(5);
818        assert_eq!(
819            selection.selectors,
820            vec![
821                RowSelector::skip(30),
822                RowSelector::select(5),
823                RowSelector::skip(33),
824                RowSelector::select(6),
825            ]
826        );
827
828        let selection = selection.offset(3);
829        assert_eq!(
830            selection.selectors,
831            vec![
832                RowSelector::skip(33),
833                RowSelector::select(2),
834                RowSelector::skip(33),
835                RowSelector::select(6),
836            ]
837        );
838
839        let selection = selection.offset(2);
840        assert_eq!(
841            selection.selectors,
842            vec![RowSelector::skip(68), RowSelector::select(6),]
843        );
844
845        let selection = selection.offset(3);
846        assert_eq!(
847            selection.selectors,
848            vec![RowSelector::skip(71), RowSelector::select(3),]
849        );
850    }
851
852    #[test]
853    fn test_and() {
854        let mut a = RowSelection::from(vec![
855            RowSelector::skip(12),
856            RowSelector::select(23),
857            RowSelector::skip(3),
858            RowSelector::select(5),
859        ]);
860
861        let b = RowSelection::from(vec![
862            RowSelector::select(5),
863            RowSelector::skip(4),
864            RowSelector::select(15),
865            RowSelector::skip(4),
866        ]);
867
868        let mut expected = RowSelection::from(vec![
869            RowSelector::skip(12),
870            RowSelector::select(5),
871            RowSelector::skip(4),
872            RowSelector::select(14),
873            RowSelector::skip(3),
874            RowSelector::select(1),
875            RowSelector::skip(4),
876        ]);
877
878        assert_eq!(a.and_then(&b), expected);
879
880        a.split_off(7);
881        expected.split_off(7);
882        assert_eq!(a.and_then(&b), expected);
883
884        let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]);
885
886        let b = RowSelection::from(vec![
887            RowSelector::select(2),
888            RowSelector::skip(1),
889            RowSelector::select(1),
890            RowSelector::skip(1),
891        ]);
892
893        assert_eq!(
894            a.and_then(&b).selectors,
895            vec![
896                RowSelector::select(2),
897                RowSelector::skip(1),
898                RowSelector::select(1),
899                RowSelector::skip(4)
900            ]
901        );
902    }
903
904    #[test]
905    fn test_combine() {
906        let a = vec![
907            RowSelector::skip(3),
908            RowSelector::skip(3),
909            RowSelector::select(10),
910            RowSelector::skip(4),
911        ];
912
913        let b = vec![
914            RowSelector::skip(3),
915            RowSelector::skip(3),
916            RowSelector::select(10),
917            RowSelector::skip(4),
918            RowSelector::skip(0),
919        ];
920
921        let c = vec![
922            RowSelector::skip(2),
923            RowSelector::skip(4),
924            RowSelector::select(3),
925            RowSelector::select(3),
926            RowSelector::select(4),
927            RowSelector::skip(3),
928            RowSelector::skip(1),
929            RowSelector::skip(0),
930        ];
931
932        let expected = RowSelection::from(vec![
933            RowSelector::skip(6),
934            RowSelector::select(10),
935            RowSelector::skip(4),
936        ]);
937
938        assert_eq!(RowSelection::from_iter(a), expected);
939        assert_eq!(RowSelection::from_iter(b), expected);
940        assert_eq!(RowSelection::from_iter(c), expected);
941    }
942
943    #[test]
944    fn test_combine_2elements() {
945        let a = vec![RowSelector::select(10), RowSelector::select(5)];
946        let a_expect = vec![RowSelector::select(15)];
947        assert_eq!(RowSelection::from_iter(a).selectors, a_expect);
948
949        let b = vec![RowSelector::select(10), RowSelector::skip(5)];
950        let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)];
951        assert_eq!(RowSelection::from_iter(b).selectors, b_expect);
952
953        let c = vec![RowSelector::skip(10), RowSelector::select(5)];
954        let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)];
955        assert_eq!(RowSelection::from_iter(c).selectors, c_expect);
956
957        let d = vec![RowSelector::skip(10), RowSelector::skip(5)];
958        let d_expect = vec![RowSelector::skip(15)];
959        assert_eq!(RowSelection::from_iter(d).selectors, d_expect);
960    }
961
962    #[test]
963    fn test_from_one_and_empty() {
964        let a = vec![RowSelector::select(10)];
965        let selection1 = RowSelection::from(a.clone());
966        assert_eq!(selection1.selectors, a);
967
968        let b = vec![];
969        let selection1 = RowSelection::from(b.clone());
970        assert_eq!(selection1.selectors, b)
971    }
972
973    #[test]
974    #[should_panic(expected = "selection exceeds the number of selected rows")]
975    fn test_and_longer() {
976        let a = RowSelection::from(vec![
977            RowSelector::select(3),
978            RowSelector::skip(33),
979            RowSelector::select(3),
980            RowSelector::skip(33),
981        ]);
982        let b = RowSelection::from(vec![RowSelector::select(36)]);
983        a.and_then(&b);
984    }
985
986    #[test]
987    #[should_panic(expected = "selection contains less than the number of selected rows")]
988    fn test_and_shorter() {
989        let a = RowSelection::from(vec![
990            RowSelector::select(3),
991            RowSelector::skip(33),
992            RowSelector::select(3),
993            RowSelector::skip(33),
994        ]);
995        let b = RowSelection::from(vec![RowSelector::select(3)]);
996        a.and_then(&b);
997    }
998
999    #[test]
1000    fn test_intersect_row_selection_and_combine() {
1001        // a size equal b size
1002        let a = vec![
1003            RowSelector::select(5),
1004            RowSelector::skip(4),
1005            RowSelector::select(1),
1006        ];
1007        let b = vec![
1008            RowSelector::select(8),
1009            RowSelector::skip(1),
1010            RowSelector::select(1),
1011        ];
1012
1013        let res = intersect_row_selections(&a, &b);
1014        assert_eq!(
1015            res.selectors,
1016            vec![
1017                RowSelector::select(5),
1018                RowSelector::skip(4),
1019                RowSelector::select(1),
1020            ],
1021        );
1022
1023        // a size larger than b size
1024        let a = vec![
1025            RowSelector::select(3),
1026            RowSelector::skip(33),
1027            RowSelector::select(3),
1028            RowSelector::skip(33),
1029        ];
1030        let b = vec![RowSelector::select(36), RowSelector::skip(36)];
1031        let res = intersect_row_selections(&a, &b);
1032        assert_eq!(
1033            res.selectors,
1034            vec![RowSelector::select(3), RowSelector::skip(69)]
1035        );
1036
1037        // a size less than b size
1038        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
1039        let b = vec![
1040            RowSelector::select(2),
1041            RowSelector::skip(2),
1042            RowSelector::select(2),
1043            RowSelector::skip(2),
1044            RowSelector::select(2),
1045        ];
1046        let res = intersect_row_selections(&a, &b);
1047        assert_eq!(
1048            res.selectors,
1049            vec![RowSelector::select(2), RowSelector::skip(8)]
1050        );
1051
1052        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
1053        let b = vec![
1054            RowSelector::select(2),
1055            RowSelector::skip(2),
1056            RowSelector::select(2),
1057            RowSelector::skip(2),
1058            RowSelector::select(2),
1059        ];
1060        let res = intersect_row_selections(&a, &b);
1061        assert_eq!(
1062            res.selectors,
1063            vec![RowSelector::select(2), RowSelector::skip(8)]
1064        );
1065    }
1066
1067    #[test]
1068    fn test_and_fuzz() {
1069        let mut rand = rng();
1070        for _ in 0..100 {
1071            let a_len = rand.random_range(10..100);
1072            let a_bools: Vec<_> = (0..a_len).map(|_| rand.random_bool(0.2)).collect();
1073            let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]);
1074
1075            let b_len: usize = a_bools.iter().map(|x| *x as usize).sum();
1076            let b_bools: Vec<_> = (0..b_len).map(|_| rand.random_bool(0.8)).collect();
1077            let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]);
1078
1079            let mut expected_bools = vec![false; a_len];
1080
1081            let mut iter_b = b_bools.iter();
1082            for (idx, b) in a_bools.iter().enumerate() {
1083                if *b && *iter_b.next().unwrap() {
1084                    expected_bools[idx] = true;
1085                }
1086            }
1087
1088            let expected = RowSelection::from_filters(&[BooleanArray::from(expected_bools)]);
1089
1090            let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum();
1091            assert_eq!(a_len, total_rows);
1092
1093            assert_eq!(a.and_then(&b), expected);
1094        }
1095    }
1096
1097    #[test]
1098    fn test_iter() {
1099        // use the iter() API to show it does what is expected and
1100        // avoid accidental deletion
1101        let selectors = vec![
1102            RowSelector::select(3),
1103            RowSelector::skip(33),
1104            RowSelector::select(4),
1105        ];
1106
1107        let round_tripped = RowSelection::from(selectors.clone())
1108            .iter()
1109            .cloned()
1110            .collect::<Vec<_>>();
1111        assert_eq!(selectors, round_tripped);
1112    }
1113
1114    #[test]
1115    fn test_limit() {
1116        // Limit to existing limit should no-op
1117        let selection = RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(90)]);
1118        let limited = selection.limit(10);
1119        assert_eq!(RowSelection::from(vec![RowSelector::select(10)]), limited);
1120
1121        let selection = RowSelection::from(vec![
1122            RowSelector::select(10),
1123            RowSelector::skip(10),
1124            RowSelector::select(10),
1125            RowSelector::skip(10),
1126            RowSelector::select(10),
1127        ]);
1128
1129        let limited = selection.clone().limit(5);
1130        let expected = vec![RowSelector::select(5)];
1131        assert_eq!(limited.selectors, expected);
1132
1133        let limited = selection.clone().limit(15);
1134        let expected = vec![
1135            RowSelector::select(10),
1136            RowSelector::skip(10),
1137            RowSelector::select(5),
1138        ];
1139        assert_eq!(limited.selectors, expected);
1140
1141        let limited = selection.clone().limit(0);
1142        let expected = vec![];
1143        assert_eq!(limited.selectors, expected);
1144
1145        let limited = selection.clone().limit(30);
1146        let expected = vec![
1147            RowSelector::select(10),
1148            RowSelector::skip(10),
1149            RowSelector::select(10),
1150            RowSelector::skip(10),
1151            RowSelector::select(10),
1152        ];
1153        assert_eq!(limited.selectors, expected);
1154
1155        let limited = selection.limit(100);
1156        let expected = vec![
1157            RowSelector::select(10),
1158            RowSelector::skip(10),
1159            RowSelector::select(10),
1160            RowSelector::skip(10),
1161            RowSelector::select(10),
1162        ];
1163        assert_eq!(limited.selectors, expected);
1164    }
1165
1166    #[test]
1167    fn test_scan_ranges() {
1168        let index = vec![
1169            PageLocation {
1170                offset: 0,
1171                compressed_page_size: 10,
1172                first_row_index: 0,
1173            },
1174            PageLocation {
1175                offset: 10,
1176                compressed_page_size: 10,
1177                first_row_index: 10,
1178            },
1179            PageLocation {
1180                offset: 20,
1181                compressed_page_size: 10,
1182                first_row_index: 20,
1183            },
1184            PageLocation {
1185                offset: 30,
1186                compressed_page_size: 10,
1187                first_row_index: 30,
1188            },
1189            PageLocation {
1190                offset: 40,
1191                compressed_page_size: 10,
1192                first_row_index: 40,
1193            },
1194            PageLocation {
1195                offset: 50,
1196                compressed_page_size: 10,
1197                first_row_index: 50,
1198            },
1199            PageLocation {
1200                offset: 60,
1201                compressed_page_size: 10,
1202                first_row_index: 60,
1203            },
1204        ];
1205
1206        let selection = RowSelection::from(vec![
1207            // Skip first page
1208            RowSelector::skip(10),
1209            // Multiple selects in same page
1210            RowSelector::select(3),
1211            RowSelector::skip(3),
1212            RowSelector::select(4),
1213            // Select to page boundary
1214            RowSelector::skip(5),
1215            RowSelector::select(5),
1216            // Skip full page past page boundary
1217            RowSelector::skip(12),
1218            // Select across page boundaries
1219            RowSelector::select(12),
1220            // Skip final page
1221            RowSelector::skip(12),
1222        ]);
1223
1224        let ranges = selection.scan_ranges(&index);
1225
1226        // assert_eq!(mask, vec![false, true, true, false, true, true, false]);
1227        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);
1228
1229        let selection = RowSelection::from(vec![
1230            // Skip first page
1231            RowSelector::skip(10),
1232            // Multiple selects in same page
1233            RowSelector::select(3),
1234            RowSelector::skip(3),
1235            RowSelector::select(4),
1236            // Select to page boundary
1237            RowSelector::skip(5),
1238            RowSelector::select(5),
1239            // Skip full page past page boundary
1240            RowSelector::skip(12),
1241            // Select across page boundaries
1242            RowSelector::select(12),
1243            RowSelector::skip(1),
1244            // Select across page boundaries including final page
1245            RowSelector::select(8),
1246        ]);
1247
1248        let ranges = selection.scan_ranges(&index);
1249
1250        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1251        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1252
1253        let selection = RowSelection::from(vec![
1254            // Skip first page
1255            RowSelector::skip(10),
1256            // Multiple selects in same page
1257            RowSelector::select(3),
1258            RowSelector::skip(3),
1259            RowSelector::select(4),
1260            // Select to page boundary
1261            RowSelector::skip(5),
1262            RowSelector::select(5),
1263            // Skip full page past page boundary
1264            RowSelector::skip(12),
1265            // Select to final page boundary
1266            RowSelector::select(12),
1267            RowSelector::skip(1),
1268            // Skip across final page boundary
1269            RowSelector::skip(8),
1270            // Select from final page
1271            RowSelector::select(4),
1272        ]);
1273
1274        let ranges = selection.scan_ranges(&index);
1275
1276        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1277        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1278
1279        let selection = RowSelection::from(vec![
1280            // Skip first page
1281            RowSelector::skip(10),
1282            // Multiple selects in same page
1283            RowSelector::select(3),
1284            RowSelector::skip(3),
1285            RowSelector::select(4),
1286            // Select to remaining in page and first row of next page
1287            RowSelector::skip(5),
1288            RowSelector::select(6),
1289            // Skip remaining
1290            RowSelector::skip(50),
1291        ]);
1292
1293        let ranges = selection.scan_ranges(&index);
1294
1295        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1296        assert_eq!(ranges, vec![10..20, 20..30, 30..40]);
1297    }
1298
1299    #[test]
1300    fn test_from_ranges() {
1301        let ranges = [1..3, 4..6, 6..6, 8..8, 9..10];
1302        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10);
1303        assert_eq!(
1304            selection.selectors,
1305            vec![
1306                RowSelector::skip(1),
1307                RowSelector::select(2),
1308                RowSelector::skip(1),
1309                RowSelector::select(2),
1310                RowSelector::skip(3),
1311                RowSelector::select(1)
1312            ]
1313        );
1314
1315        let out_of_order_ranges = [1..3, 8..10, 4..7];
1316        let result = std::panic::catch_unwind(|| {
1317            RowSelection::from_consecutive_ranges(out_of_order_ranges.into_iter(), 10)
1318        });
1319        assert!(result.is_err());
1320    }
1321
1322    #[test]
1323    fn test_empty_selector() {
1324        let selection = RowSelection::from(vec![
1325            RowSelector::skip(0),
1326            RowSelector::select(2),
1327            RowSelector::skip(0),
1328            RowSelector::select(2),
1329        ]);
1330        assert_eq!(selection.selectors, vec![RowSelector::select(4)]);
1331
1332        let selection = RowSelection::from(vec![
1333            RowSelector::select(0),
1334            RowSelector::skip(2),
1335            RowSelector::select(0),
1336            RowSelector::skip(2),
1337        ]);
1338        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
1339    }
1340
1341    #[test]
1342    fn test_intersection() {
1343        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1344        let result = selection.intersection(&selection);
1345        assert_eq!(result, selection);
1346
1347        let a = RowSelection::from(vec![
1348            RowSelector::skip(10),
1349            RowSelector::select(10),
1350            RowSelector::skip(10),
1351            RowSelector::select(20),
1352        ]);
1353
1354        let b = RowSelection::from(vec![
1355            RowSelector::skip(20),
1356            RowSelector::select(20),
1357            RowSelector::skip(10),
1358        ]);
1359
1360        let result = a.intersection(&b);
1361        assert_eq!(
1362            result.selectors,
1363            vec![
1364                RowSelector::skip(30),
1365                RowSelector::select(10),
1366                RowSelector::skip(10)
1367            ]
1368        );
1369    }
1370
1371    #[test]
1372    fn test_union() {
1373        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1374        let result = selection.union(&selection);
1375        assert_eq!(result, selection);
1376
1377        // NYNYY
1378        let a = RowSelection::from(vec![
1379            RowSelector::skip(10),
1380            RowSelector::select(10),
1381            RowSelector::skip(10),
1382            RowSelector::select(20),
1383        ]);
1384
1385        // NNYYNYN
1386        let b = RowSelection::from(vec![
1387            RowSelector::skip(20),
1388            RowSelector::select(20),
1389            RowSelector::skip(10),
1390            RowSelector::select(10),
1391            RowSelector::skip(10),
1392        ]);
1393
1394        let result = a.union(&b);
1395
1396        // NYYYYYN
1397        assert_eq!(
1398            result.iter().collect::<Vec<_>>(),
1399            vec![
1400                &RowSelector::skip(10),
1401                &RowSelector::select(50),
1402                &RowSelector::skip(10),
1403            ]
1404        );
1405    }
1406
1407    #[test]
1408    fn test_row_count() {
1409        let selection = RowSelection::from(vec![
1410            RowSelector::skip(34),
1411            RowSelector::select(12),
1412            RowSelector::skip(3),
1413            RowSelector::select(35),
1414        ]);
1415
1416        assert_eq!(selection.row_count(), 12 + 35);
1417        assert_eq!(selection.skipped_row_count(), 34 + 3);
1418
1419        let selection = RowSelection::from(vec![RowSelector::select(12), RowSelector::select(35)]);
1420
1421        assert_eq!(selection.row_count(), 12 + 35);
1422        assert_eq!(selection.skipped_row_count(), 0);
1423
1424        let selection = RowSelection::from(vec![RowSelector::skip(34), RowSelector::skip(3)]);
1425
1426        assert_eq!(selection.row_count(), 0);
1427        assert_eq!(selection.skipped_row_count(), 34 + 3);
1428
1429        let selection = RowSelection::from(vec![]);
1430
1431        assert_eq!(selection.row_count(), 0);
1432        assert_eq!(selection.skipped_row_count(), 0);
1433    }
1434}