parquet/arrow/arrow_reader/
selection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_array::{Array, BooleanArray};
19use arrow_select::filter::SlicesIterator;
20use std::cmp::Ordering;
21use std::collections::VecDeque;
22use std::ops::Range;
23
24/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
25/// scanning a parquet file
26#[derive(Debug, Clone, Copy, Eq, PartialEq)]
27pub struct RowSelector {
28    /// The number of rows
29    pub row_count: usize,
30
31    /// If true, skip `row_count` rows
32    pub skip: bool,
33}
34
35impl RowSelector {
36    /// Select `row_count` rows
37    pub fn select(row_count: usize) -> Self {
38        Self {
39            row_count,
40            skip: false,
41        }
42    }
43
44    /// Skip `row_count` rows
45    pub fn skip(row_count: usize) -> Self {
46        Self {
47            row_count,
48            skip: true,
49        }
50    }
51}
52
53/// [`RowSelection`] allows selecting or skipping a provided number of rows
54/// when scanning the parquet file.
55///
56/// This is applied prior to reading column data, and can therefore
57/// be used to skip IO to fetch data into memory
58///
59/// A typical use-case would be using the [`PageIndex`] to filter out rows
60/// that don't satisfy a predicate
61///
62/// # Example
63/// ```
64/// use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
65///
66/// let selectors = vec![
67///     RowSelector::skip(5),
68///     RowSelector::select(5),
69///     RowSelector::select(5),
70///     RowSelector::skip(5),
71/// ];
72///
73/// // Creating a selection will combine adjacent selectors
74/// let selection: RowSelection = selectors.into();
75///
76/// let expected = vec![
77///     RowSelector::skip(5),
78///     RowSelector::select(10),
79///     RowSelector::skip(5),
80/// ];
81///
82/// let actual: Vec<RowSelector> = selection.into();
83/// assert_eq!(actual, expected);
84///
85/// // you can also create a selection from consecutive ranges
86/// let ranges = vec![5..10, 10..15];
87/// let selection =
88///   RowSelection::from_consecutive_ranges(ranges.into_iter(), 20);
89/// let actual: Vec<RowSelector> = selection.into();
90/// assert_eq!(actual, expected);
91/// ```
92///
93/// A [`RowSelection`] maintains the following invariants:
94///
95/// * It contains no [`RowSelector`] of 0 rows
96/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows
97///
98/// [`PageIndex`]: crate::file::page_index::index::PageIndex
99#[derive(Debug, Clone, Default, Eq, PartialEq)]
100pub struct RowSelection {
101    selectors: Vec<RowSelector>,
102}
103
104impl RowSelection {
105    /// Creates a [`RowSelection`] from a slice of [`BooleanArray`]
106    ///
107    /// # Panic
108    ///
109    /// Panics if any of the [`BooleanArray`] contain nulls
110    pub fn from_filters(filters: &[BooleanArray]) -> Self {
111        let mut next_offset = 0;
112        let total_rows = filters.iter().map(|x| x.len()).sum();
113
114        let iter = filters.iter().flat_map(|filter| {
115            let offset = next_offset;
116            next_offset += filter.len();
117            assert_eq!(filter.null_count(), 0);
118            SlicesIterator::new(filter).map(move |(start, end)| start + offset..end + offset)
119        });
120
121        Self::from_consecutive_ranges(iter, total_rows)
122    }
123
124    /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
125    pub fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
126        ranges: I,
127        total_rows: usize,
128    ) -> Self {
129        let mut selectors: Vec<RowSelector> = Vec::with_capacity(ranges.size_hint().0);
130        let mut last_end = 0;
131        for range in ranges {
132            let len = range.end - range.start;
133            if len == 0 {
134                continue;
135            }
136
137            match range.start.cmp(&last_end) {
138                Ordering::Equal => match selectors.last_mut() {
139                    Some(last) => last.row_count = last.row_count.checked_add(len).unwrap(),
140                    None => selectors.push(RowSelector::select(len)),
141                },
142                Ordering::Greater => {
143                    selectors.push(RowSelector::skip(range.start - last_end));
144                    selectors.push(RowSelector::select(len))
145                }
146                Ordering::Less => panic!("out of order"),
147            }
148            last_end = range.end;
149        }
150
151        if last_end != total_rows {
152            selectors.push(RowSelector::skip(total_rows - last_end))
153        }
154
155        Self { selectors }
156    }
157
158    /// Given an offset index, return the byte ranges for all data pages selected by `self`
159    ///
160    /// This is useful for determining what byte ranges to fetch from underlying storage
161    ///
162    /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce
163    /// ranges that are close together. This is instead delegated to the IO subsystem to optimise,
164    /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
165    pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec<Range<usize>> {
166        let mut ranges = vec![];
167        let mut row_offset = 0;
168
169        let mut pages = page_locations.iter().peekable();
170        let mut selectors = self.selectors.iter().cloned();
171        let mut current_selector = selectors.next();
172        let mut current_page = pages.next();
173
174        let mut current_page_included = false;
175
176        while let Some((selector, page)) = current_selector.as_mut().zip(current_page) {
177            if !(selector.skip || current_page_included) {
178                let start = page.offset as usize;
179                let end = start + page.compressed_page_size as usize;
180                ranges.push(start..end);
181                current_page_included = true;
182            }
183
184            if let Some(next_page) = pages.peek() {
185                if row_offset + selector.row_count > next_page.first_row_index as usize {
186                    let remaining_in_page = next_page.first_row_index as usize - row_offset;
187                    selector.row_count -= remaining_in_page;
188                    row_offset += remaining_in_page;
189                    current_page = pages.next();
190                    current_page_included = false;
191
192                    continue;
193                } else {
194                    if row_offset + selector.row_count == next_page.first_row_index as usize {
195                        current_page = pages.next();
196                        current_page_included = false;
197                    }
198                    row_offset += selector.row_count;
199                    current_selector = selectors.next();
200                }
201            } else {
202                if !(selector.skip || current_page_included) {
203                    let start = page.offset as usize;
204                    let end = start + page.compressed_page_size as usize;
205                    ranges.push(start..end);
206                }
207                current_selector = selectors.next()
208            }
209        }
210
211        ranges
212    }
213
214    /// Splits off the first `row_count` from this [`RowSelection`]
215    pub fn split_off(&mut self, row_count: usize) -> Self {
216        let mut total_count = 0;
217
218        // Find the index where the selector exceeds the row count
219        let find = self.selectors.iter().position(|selector| {
220            total_count += selector.row_count;
221            total_count > row_count
222        });
223
224        let split_idx = match find {
225            Some(idx) => idx,
226            None => {
227                let selectors = std::mem::take(&mut self.selectors);
228                return Self { selectors };
229            }
230        };
231
232        let mut remaining = self.selectors.split_off(split_idx);
233
234        // Always present as `split_idx < self.selectors.len`
235        let next = remaining.first_mut().unwrap();
236        let overflow = total_count - row_count;
237
238        if next.row_count != overflow {
239            self.selectors.push(RowSelector {
240                row_count: next.row_count - overflow,
241                skip: next.skip,
242            })
243        }
244        next.row_count = overflow;
245
246        std::mem::swap(&mut remaining, &mut self.selectors);
247        Self {
248            selectors: remaining,
249        }
250    }
251    /// returns a [`RowSelection`] representing rows that are selected in both
252    /// input [`RowSelection`]s.
253    ///
254    /// This is equivalent to the logical `AND` / conjunction of the two
255    /// selections.
256    ///
257    /// # Example
258    /// If `N` means the row is not selected, and `Y` means it is
259    /// selected:
260    ///
261    /// ```text
262    /// self:     NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
263    /// other:                YYYYYNNNNYYYYYYYYYYYYY   YYNNN
264    ///
265    /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN
266    /// ```
267    ///
268    /// # Panics
269    ///
270    /// Panics if `other` does not have a length equal to the number of rows selected
271    /// by this RowSelection
272    ///
273    pub fn and_then(&self, other: &Self) -> Self {
274        let mut selectors = vec![];
275        let mut first = self.selectors.iter().cloned().peekable();
276        let mut second = other.selectors.iter().cloned().peekable();
277
278        let mut to_skip = 0;
279        while let Some(b) = second.peek_mut() {
280            let a = first
281                .peek_mut()
282                .expect("selection exceeds the number of selected rows");
283
284            if b.row_count == 0 {
285                second.next().unwrap();
286                continue;
287            }
288
289            if a.row_count == 0 {
290                first.next().unwrap();
291                continue;
292            }
293
294            if a.skip {
295                // Records were skipped when producing second
296                to_skip += a.row_count;
297                first.next().unwrap();
298                continue;
299            }
300
301            let skip = b.skip;
302            let to_process = a.row_count.min(b.row_count);
303
304            a.row_count -= to_process;
305            b.row_count -= to_process;
306
307            match skip {
308                true => to_skip += to_process,
309                false => {
310                    if to_skip != 0 {
311                        selectors.push(RowSelector::skip(to_skip));
312                        to_skip = 0;
313                    }
314                    selectors.push(RowSelector::select(to_process))
315                }
316            }
317        }
318
319        for v in first {
320            if v.row_count != 0 {
321                assert!(
322                    v.skip,
323                    "selection contains less than the number of selected rows"
324                );
325                to_skip += v.row_count
326            }
327        }
328
329        if to_skip != 0 {
330            selectors.push(RowSelector::skip(to_skip));
331        }
332
333        Self { selectors }
334    }
335
336    /// Compute the intersection of two [`RowSelection`]
337    /// For example:
338    /// self:      NNYYYYNNYYNYN
339    /// other:     NYNNNNNNY
340    ///
341    /// returned:  NNNNNNNNYYNYN
342    pub fn intersection(&self, other: &Self) -> Self {
343        intersect_row_selections(&self.selectors, &other.selectors)
344    }
345
346    /// Compute the union of two [`RowSelection`]
347    /// For example:
348    /// self:      NNYYYYNNYYNYN
349    /// other:     NYNNNNNNN
350    ///
351    /// returned:  NYYYYYNNYYNYN
352    pub fn union(&self, other: &Self) -> Self {
353        union_row_selections(&self.selectors, &other.selectors)
354    }
355
356    /// Returns `true` if this [`RowSelection`] selects any rows
357    pub fn selects_any(&self) -> bool {
358        self.selectors.iter().any(|x| !x.skip)
359    }
360
361    /// Trims this [`RowSelection`] removing any trailing skips
362    pub(crate) fn trim(mut self) -> Self {
363        while self.selectors.last().map(|x| x.skip).unwrap_or(false) {
364            self.selectors.pop();
365        }
366        self
367    }
368
369    /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows
370    pub(crate) fn offset(mut self, offset: usize) -> Self {
371        if offset == 0 {
372            return self;
373        }
374
375        let mut selected_count = 0;
376        let mut skipped_count = 0;
377
378        // Find the index where the selector exceeds the row count
379        let find = self
380            .selectors
381            .iter()
382            .position(|selector| match selector.skip {
383                true => {
384                    skipped_count += selector.row_count;
385                    false
386                }
387                false => {
388                    selected_count += selector.row_count;
389                    selected_count > offset
390                }
391            });
392
393        let split_idx = match find {
394            Some(idx) => idx,
395            None => {
396                self.selectors.clear();
397                return self;
398            }
399        };
400
401        let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1);
402        selectors.push(RowSelector::skip(skipped_count + offset));
403        selectors.push(RowSelector::select(selected_count - offset));
404        selectors.extend_from_slice(&self.selectors[split_idx + 1..]);
405
406        Self { selectors }
407    }
408
409    /// Limit this [`RowSelection`] to only select `limit` rows
410    pub(crate) fn limit(mut self, mut limit: usize) -> Self {
411        if limit == 0 {
412            self.selectors.clear();
413        }
414
415        for (idx, selection) in self.selectors.iter_mut().enumerate() {
416            if !selection.skip {
417                if selection.row_count >= limit {
418                    selection.row_count = limit;
419                    self.selectors.truncate(idx + 1);
420                    break;
421                } else {
422                    limit -= selection.row_count;
423                }
424            }
425        }
426        self
427    }
428
429    /// Returns an iterator over the [`RowSelector`]s for this
430    /// [`RowSelection`].
431    pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
432        self.selectors.iter()
433    }
434
435    /// Returns the number of selected rows
436    pub fn row_count(&self) -> usize {
437        self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum()
438    }
439
440    /// Returns the number of de-selected rows
441    pub fn skipped_row_count(&self) -> usize {
442        self.iter().filter(|s| s.skip).map(|s| s.row_count).sum()
443    }
444}
445
446impl From<Vec<RowSelector>> for RowSelection {
447    fn from(selectors: Vec<RowSelector>) -> Self {
448        selectors.into_iter().collect()
449    }
450}
451
452impl FromIterator<RowSelector> for RowSelection {
453    fn from_iter<T: IntoIterator<Item = RowSelector>>(iter: T) -> Self {
454        let iter = iter.into_iter();
455
456        // Capacity before filter
457        let mut selectors = Vec::with_capacity(iter.size_hint().0);
458
459        let mut filtered = iter.filter(|x| x.row_count != 0);
460        if let Some(x) = filtered.next() {
461            selectors.push(x);
462        }
463
464        for s in filtered {
465            if s.row_count == 0 {
466                continue;
467            }
468
469            // Combine consecutive selectors
470            let last = selectors.last_mut().unwrap();
471            if last.skip == s.skip {
472                last.row_count = last.row_count.checked_add(s.row_count).unwrap();
473            } else {
474                selectors.push(s)
475            }
476        }
477
478        Self { selectors }
479    }
480}
481
482impl From<RowSelection> for Vec<RowSelector> {
483    fn from(r: RowSelection) -> Self {
484        r.selectors
485    }
486}
487
488impl From<RowSelection> for VecDeque<RowSelector> {
489    fn from(r: RowSelection) -> Self {
490        r.selectors.into()
491    }
492}
493
494/// Combine two lists of `RowSelection` return the intersection of them
495/// For example:
496/// self:      NNYYYYNNYYNYN
497/// other:     NYNNNNNNY
498///
499/// returned:  NNNNNNNNYYNYN
500fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
501    let mut l_iter = left.iter().copied().peekable();
502    let mut r_iter = right.iter().copied().peekable();
503
504    let iter = std::iter::from_fn(move || {
505        loop {
506            let l = l_iter.peek_mut();
507            let r = r_iter.peek_mut();
508
509            match (l, r) {
510                (Some(a), _) if a.row_count == 0 => {
511                    l_iter.next().unwrap();
512                }
513                (_, Some(b)) if b.row_count == 0 => {
514                    r_iter.next().unwrap();
515                }
516                (Some(l), Some(r)) => {
517                    return match (l.skip, r.skip) {
518                        // Keep both ranges
519                        (false, false) => {
520                            if l.row_count < r.row_count {
521                                r.row_count -= l.row_count;
522                                l_iter.next()
523                            } else {
524                                l.row_count -= r.row_count;
525                                r_iter.next()
526                            }
527                        }
528                        // skip at least one
529                        _ => {
530                            if l.row_count < r.row_count {
531                                let skip = l.row_count;
532                                r.row_count -= l.row_count;
533                                l_iter.next();
534                                Some(RowSelector::skip(skip))
535                            } else {
536                                let skip = r.row_count;
537                                l.row_count -= skip;
538                                r_iter.next();
539                                Some(RowSelector::skip(skip))
540                            }
541                        }
542                    };
543                }
544                (Some(_), None) => return l_iter.next(),
545                (None, Some(_)) => return r_iter.next(),
546                (None, None) => return None,
547            }
548        }
549    });
550
551    iter.collect()
552}
553
554/// Combine two lists of `RowSelector` return the union of them
555/// For example:
556/// self:      NNYYYYNNYYNYN
557/// other:     NYNNNNNNY
558///
559/// returned:  NYYYYYNNYYNYN
560///
561/// This can be removed from here once RowSelection::union is in parquet::arrow
562fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
563    let mut l_iter = left.iter().copied().peekable();
564    let mut r_iter = right.iter().copied().peekable();
565
566    let iter = std::iter::from_fn(move || {
567        loop {
568            let l = l_iter.peek_mut();
569            let r = r_iter.peek_mut();
570
571            match (l, r) {
572                (Some(a), _) if a.row_count == 0 => {
573                    l_iter.next().unwrap();
574                }
575                (_, Some(b)) if b.row_count == 0 => {
576                    r_iter.next().unwrap();
577                }
578                (Some(l), Some(r)) => {
579                    return match (l.skip, r.skip) {
580                        // Skip both ranges
581                        (true, true) => {
582                            if l.row_count < r.row_count {
583                                let skip = l.row_count;
584                                r.row_count -= l.row_count;
585                                l_iter.next();
586                                Some(RowSelector::skip(skip))
587                            } else {
588                                let skip = r.row_count;
589                                l.row_count -= skip;
590                                r_iter.next();
591                                Some(RowSelector::skip(skip))
592                            }
593                        }
594                        // Keep rows from left
595                        (false, true) => {
596                            if l.row_count < r.row_count {
597                                r.row_count -= l.row_count;
598                                l_iter.next()
599                            } else {
600                                let r_row_count = r.row_count;
601                                l.row_count -= r_row_count;
602                                r_iter.next();
603                                Some(RowSelector::select(r_row_count))
604                            }
605                        }
606                        // Keep rows from right
607                        (true, false) => {
608                            if l.row_count < r.row_count {
609                                let l_row_count = l.row_count;
610                                r.row_count -= l_row_count;
611                                l_iter.next();
612                                Some(RowSelector::select(l_row_count))
613                            } else {
614                                l.row_count -= r.row_count;
615                                r_iter.next()
616                            }
617                        }
618                        // Keep at least one
619                        _ => {
620                            if l.row_count < r.row_count {
621                                r.row_count -= l.row_count;
622                                l_iter.next()
623                            } else {
624                                l.row_count -= r.row_count;
625                                r_iter.next()
626                            }
627                        }
628                    };
629                }
630                (Some(_), None) => return l_iter.next(),
631                (None, Some(_)) => return r_iter.next(),
632                (None, None) => return None,
633            }
634        }
635    });
636
637    iter.collect()
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643    use crate::format::PageLocation;
644    use rand::{thread_rng, Rng};
645
646    #[test]
647    fn test_from_filters() {
648        let filters = vec![
649            BooleanArray::from(vec![false, false, false, true, true, true, true]),
650            BooleanArray::from(vec![true, true, false, false, true, true, true]),
651            BooleanArray::from(vec![false, false, false, false]),
652            BooleanArray::from(Vec::<bool>::new()),
653        ];
654
655        let selection = RowSelection::from_filters(&filters[..1]);
656        assert!(selection.selects_any());
657        assert_eq!(
658            selection.selectors,
659            vec![RowSelector::skip(3), RowSelector::select(4)]
660        );
661
662        let selection = RowSelection::from_filters(&filters[..2]);
663        assert!(selection.selects_any());
664        assert_eq!(
665            selection.selectors,
666            vec![
667                RowSelector::skip(3),
668                RowSelector::select(6),
669                RowSelector::skip(2),
670                RowSelector::select(3)
671            ]
672        );
673
674        let selection = RowSelection::from_filters(&filters);
675        assert!(selection.selects_any());
676        assert_eq!(
677            selection.selectors,
678            vec![
679                RowSelector::skip(3),
680                RowSelector::select(6),
681                RowSelector::skip(2),
682                RowSelector::select(3),
683                RowSelector::skip(4)
684            ]
685        );
686
687        let selection = RowSelection::from_filters(&filters[2..3]);
688        assert!(!selection.selects_any());
689        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
690    }
691
692    #[test]
693    fn test_split_off() {
694        let mut selection = RowSelection::from(vec![
695            RowSelector::skip(34),
696            RowSelector::select(12),
697            RowSelector::skip(3),
698            RowSelector::select(35),
699        ]);
700
701        let split = selection.split_off(34);
702        assert_eq!(split.selectors, vec![RowSelector::skip(34)]);
703        assert_eq!(
704            selection.selectors,
705            vec![
706                RowSelector::select(12),
707                RowSelector::skip(3),
708                RowSelector::select(35)
709            ]
710        );
711
712        let split = selection.split_off(5);
713        assert_eq!(split.selectors, vec![RowSelector::select(5)]);
714        assert_eq!(
715            selection.selectors,
716            vec![
717                RowSelector::select(7),
718                RowSelector::skip(3),
719                RowSelector::select(35)
720            ]
721        );
722
723        let split = selection.split_off(8);
724        assert_eq!(
725            split.selectors,
726            vec![RowSelector::select(7), RowSelector::skip(1)]
727        );
728        assert_eq!(
729            selection.selectors,
730            vec![RowSelector::skip(2), RowSelector::select(35)]
731        );
732
733        let split = selection.split_off(200);
734        assert_eq!(
735            split.selectors,
736            vec![RowSelector::skip(2), RowSelector::select(35)]
737        );
738        assert!(selection.selectors.is_empty());
739    }
740
741    #[test]
742    fn test_offset() {
743        let selection = RowSelection::from(vec![
744            RowSelector::select(5),
745            RowSelector::skip(23),
746            RowSelector::select(7),
747            RowSelector::skip(33),
748            RowSelector::select(6),
749        ]);
750
751        let selection = selection.offset(2);
752        assert_eq!(
753            selection.selectors,
754            vec![
755                RowSelector::skip(2),
756                RowSelector::select(3),
757                RowSelector::skip(23),
758                RowSelector::select(7),
759                RowSelector::skip(33),
760                RowSelector::select(6),
761            ]
762        );
763
764        let selection = selection.offset(5);
765        assert_eq!(
766            selection.selectors,
767            vec![
768                RowSelector::skip(30),
769                RowSelector::select(5),
770                RowSelector::skip(33),
771                RowSelector::select(6),
772            ]
773        );
774
775        let selection = selection.offset(3);
776        assert_eq!(
777            selection.selectors,
778            vec![
779                RowSelector::skip(33),
780                RowSelector::select(2),
781                RowSelector::skip(33),
782                RowSelector::select(6),
783            ]
784        );
785
786        let selection = selection.offset(2);
787        assert_eq!(
788            selection.selectors,
789            vec![RowSelector::skip(68), RowSelector::select(6),]
790        );
791
792        let selection = selection.offset(3);
793        assert_eq!(
794            selection.selectors,
795            vec![RowSelector::skip(71), RowSelector::select(3),]
796        );
797    }
798
799    #[test]
800    fn test_and() {
801        let mut a = RowSelection::from(vec![
802            RowSelector::skip(12),
803            RowSelector::select(23),
804            RowSelector::skip(3),
805            RowSelector::select(5),
806        ]);
807
808        let b = RowSelection::from(vec![
809            RowSelector::select(5),
810            RowSelector::skip(4),
811            RowSelector::select(15),
812            RowSelector::skip(4),
813        ]);
814
815        let mut expected = RowSelection::from(vec![
816            RowSelector::skip(12),
817            RowSelector::select(5),
818            RowSelector::skip(4),
819            RowSelector::select(14),
820            RowSelector::skip(3),
821            RowSelector::select(1),
822            RowSelector::skip(4),
823        ]);
824
825        assert_eq!(a.and_then(&b), expected);
826
827        a.split_off(7);
828        expected.split_off(7);
829        assert_eq!(a.and_then(&b), expected);
830
831        let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]);
832
833        let b = RowSelection::from(vec![
834            RowSelector::select(2),
835            RowSelector::skip(1),
836            RowSelector::select(1),
837            RowSelector::skip(1),
838        ]);
839
840        assert_eq!(
841            a.and_then(&b).selectors,
842            vec![
843                RowSelector::select(2),
844                RowSelector::skip(1),
845                RowSelector::select(1),
846                RowSelector::skip(4)
847            ]
848        );
849    }
850
851    #[test]
852    fn test_combine() {
853        let a = vec![
854            RowSelector::skip(3),
855            RowSelector::skip(3),
856            RowSelector::select(10),
857            RowSelector::skip(4),
858        ];
859
860        let b = vec![
861            RowSelector::skip(3),
862            RowSelector::skip(3),
863            RowSelector::select(10),
864            RowSelector::skip(4),
865            RowSelector::skip(0),
866        ];
867
868        let c = vec![
869            RowSelector::skip(2),
870            RowSelector::skip(4),
871            RowSelector::select(3),
872            RowSelector::select(3),
873            RowSelector::select(4),
874            RowSelector::skip(3),
875            RowSelector::skip(1),
876            RowSelector::skip(0),
877        ];
878
879        let expected = RowSelection::from(vec![
880            RowSelector::skip(6),
881            RowSelector::select(10),
882            RowSelector::skip(4),
883        ]);
884
885        assert_eq!(RowSelection::from_iter(a), expected);
886        assert_eq!(RowSelection::from_iter(b), expected);
887        assert_eq!(RowSelection::from_iter(c), expected);
888    }
889
890    #[test]
891    fn test_combine_2elements() {
892        let a = vec![RowSelector::select(10), RowSelector::select(5)];
893        let a_expect = vec![RowSelector::select(15)];
894        assert_eq!(RowSelection::from_iter(a).selectors, a_expect);
895
896        let b = vec![RowSelector::select(10), RowSelector::skip(5)];
897        let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)];
898        assert_eq!(RowSelection::from_iter(b).selectors, b_expect);
899
900        let c = vec![RowSelector::skip(10), RowSelector::select(5)];
901        let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)];
902        assert_eq!(RowSelection::from_iter(c).selectors, c_expect);
903
904        let d = vec![RowSelector::skip(10), RowSelector::skip(5)];
905        let d_expect = vec![RowSelector::skip(15)];
906        assert_eq!(RowSelection::from_iter(d).selectors, d_expect);
907    }
908
909    #[test]
910    fn test_from_one_and_empty() {
911        let a = vec![RowSelector::select(10)];
912        let selection1 = RowSelection::from(a.clone());
913        assert_eq!(selection1.selectors, a);
914
915        let b = vec![];
916        let selection1 = RowSelection::from(b.clone());
917        assert_eq!(selection1.selectors, b)
918    }
919
920    #[test]
921    #[should_panic(expected = "selection exceeds the number of selected rows")]
922    fn test_and_longer() {
923        let a = RowSelection::from(vec![
924            RowSelector::select(3),
925            RowSelector::skip(33),
926            RowSelector::select(3),
927            RowSelector::skip(33),
928        ]);
929        let b = RowSelection::from(vec![RowSelector::select(36)]);
930        a.and_then(&b);
931    }
932
933    #[test]
934    #[should_panic(expected = "selection contains less than the number of selected rows")]
935    fn test_and_shorter() {
936        let a = RowSelection::from(vec![
937            RowSelector::select(3),
938            RowSelector::skip(33),
939            RowSelector::select(3),
940            RowSelector::skip(33),
941        ]);
942        let b = RowSelection::from(vec![RowSelector::select(3)]);
943        a.and_then(&b);
944    }
945
946    #[test]
947    fn test_intersect_row_selection_and_combine() {
948        // a size equal b size
949        let a = vec![
950            RowSelector::select(5),
951            RowSelector::skip(4),
952            RowSelector::select(1),
953        ];
954        let b = vec![
955            RowSelector::select(8),
956            RowSelector::skip(1),
957            RowSelector::select(1),
958        ];
959
960        let res = intersect_row_selections(&a, &b);
961        assert_eq!(
962            res.selectors,
963            vec![
964                RowSelector::select(5),
965                RowSelector::skip(4),
966                RowSelector::select(1),
967            ],
968        );
969
970        // a size larger than b size
971        let a = vec![
972            RowSelector::select(3),
973            RowSelector::skip(33),
974            RowSelector::select(3),
975            RowSelector::skip(33),
976        ];
977        let b = vec![RowSelector::select(36), RowSelector::skip(36)];
978        let res = intersect_row_selections(&a, &b);
979        assert_eq!(
980            res.selectors,
981            vec![RowSelector::select(3), RowSelector::skip(69)]
982        );
983
984        // a size less than b size
985        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
986        let b = vec![
987            RowSelector::select(2),
988            RowSelector::skip(2),
989            RowSelector::select(2),
990            RowSelector::skip(2),
991            RowSelector::select(2),
992        ];
993        let res = intersect_row_selections(&a, &b);
994        assert_eq!(
995            res.selectors,
996            vec![RowSelector::select(2), RowSelector::skip(8)]
997        );
998
999        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
1000        let b = vec![
1001            RowSelector::select(2),
1002            RowSelector::skip(2),
1003            RowSelector::select(2),
1004            RowSelector::skip(2),
1005            RowSelector::select(2),
1006        ];
1007        let res = intersect_row_selections(&a, &b);
1008        assert_eq!(
1009            res.selectors,
1010            vec![RowSelector::select(2), RowSelector::skip(8)]
1011        );
1012    }
1013
1014    #[test]
1015    fn test_and_fuzz() {
1016        let mut rand = thread_rng();
1017        for _ in 0..100 {
1018            let a_len = rand.gen_range(10..100);
1019            let a_bools: Vec<_> = (0..a_len).map(|_| rand.gen_bool(0.2)).collect();
1020            let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]);
1021
1022            let b_len: usize = a_bools.iter().map(|x| *x as usize).sum();
1023            let b_bools: Vec<_> = (0..b_len).map(|_| rand.gen_bool(0.8)).collect();
1024            let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]);
1025
1026            let mut expected_bools = vec![false; a_len];
1027
1028            let mut iter_b = b_bools.iter();
1029            for (idx, b) in a_bools.iter().enumerate() {
1030                if *b && *iter_b.next().unwrap() {
1031                    expected_bools[idx] = true;
1032                }
1033            }
1034
1035            let expected = RowSelection::from_filters(&[BooleanArray::from(expected_bools)]);
1036
1037            let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum();
1038            assert_eq!(a_len, total_rows);
1039
1040            assert_eq!(a.and_then(&b), expected);
1041        }
1042    }
1043
1044    #[test]
1045    fn test_iter() {
1046        // use the iter() API to show it does what is expected and
1047        // avoid accidental deletion
1048        let selectors = vec![
1049            RowSelector::select(3),
1050            RowSelector::skip(33),
1051            RowSelector::select(4),
1052        ];
1053
1054        let round_tripped = RowSelection::from(selectors.clone())
1055            .iter()
1056            .cloned()
1057            .collect::<Vec<_>>();
1058        assert_eq!(selectors, round_tripped);
1059    }
1060
1061    #[test]
1062    fn test_limit() {
1063        // Limit to existing limit should no-op
1064        let selection = RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(90)]);
1065        let limited = selection.limit(10);
1066        assert_eq!(RowSelection::from(vec![RowSelector::select(10)]), limited);
1067
1068        let selection = RowSelection::from(vec![
1069            RowSelector::select(10),
1070            RowSelector::skip(10),
1071            RowSelector::select(10),
1072            RowSelector::skip(10),
1073            RowSelector::select(10),
1074        ]);
1075
1076        let limited = selection.clone().limit(5);
1077        let expected = vec![RowSelector::select(5)];
1078        assert_eq!(limited.selectors, expected);
1079
1080        let limited = selection.clone().limit(15);
1081        let expected = vec![
1082            RowSelector::select(10),
1083            RowSelector::skip(10),
1084            RowSelector::select(5),
1085        ];
1086        assert_eq!(limited.selectors, expected);
1087
1088        let limited = selection.clone().limit(0);
1089        let expected = vec![];
1090        assert_eq!(limited.selectors, expected);
1091
1092        let limited = selection.clone().limit(30);
1093        let expected = vec![
1094            RowSelector::select(10),
1095            RowSelector::skip(10),
1096            RowSelector::select(10),
1097            RowSelector::skip(10),
1098            RowSelector::select(10),
1099        ];
1100        assert_eq!(limited.selectors, expected);
1101
1102        let limited = selection.limit(100);
1103        let expected = vec![
1104            RowSelector::select(10),
1105            RowSelector::skip(10),
1106            RowSelector::select(10),
1107            RowSelector::skip(10),
1108            RowSelector::select(10),
1109        ];
1110        assert_eq!(limited.selectors, expected);
1111    }
1112
1113    #[test]
1114    fn test_scan_ranges() {
1115        let index = vec![
1116            PageLocation {
1117                offset: 0,
1118                compressed_page_size: 10,
1119                first_row_index: 0,
1120            },
1121            PageLocation {
1122                offset: 10,
1123                compressed_page_size: 10,
1124                first_row_index: 10,
1125            },
1126            PageLocation {
1127                offset: 20,
1128                compressed_page_size: 10,
1129                first_row_index: 20,
1130            },
1131            PageLocation {
1132                offset: 30,
1133                compressed_page_size: 10,
1134                first_row_index: 30,
1135            },
1136            PageLocation {
1137                offset: 40,
1138                compressed_page_size: 10,
1139                first_row_index: 40,
1140            },
1141            PageLocation {
1142                offset: 50,
1143                compressed_page_size: 10,
1144                first_row_index: 50,
1145            },
1146            PageLocation {
1147                offset: 60,
1148                compressed_page_size: 10,
1149                first_row_index: 60,
1150            },
1151        ];
1152
1153        let selection = RowSelection::from(vec![
1154            // Skip first page
1155            RowSelector::skip(10),
1156            // Multiple selects in same page
1157            RowSelector::select(3),
1158            RowSelector::skip(3),
1159            RowSelector::select(4),
1160            // Select to page boundary
1161            RowSelector::skip(5),
1162            RowSelector::select(5),
1163            // Skip full page past page boundary
1164            RowSelector::skip(12),
1165            // Select across page boundaries
1166            RowSelector::select(12),
1167            // Skip final page
1168            RowSelector::skip(12),
1169        ]);
1170
1171        let ranges = selection.scan_ranges(&index);
1172
1173        // assert_eq!(mask, vec![false, true, true, false, true, true, false]);
1174        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);
1175
1176        let selection = RowSelection::from(vec![
1177            // Skip first page
1178            RowSelector::skip(10),
1179            // Multiple selects in same page
1180            RowSelector::select(3),
1181            RowSelector::skip(3),
1182            RowSelector::select(4),
1183            // Select to page boundary
1184            RowSelector::skip(5),
1185            RowSelector::select(5),
1186            // Skip full page past page boundary
1187            RowSelector::skip(12),
1188            // Select across page boundaries
1189            RowSelector::select(12),
1190            RowSelector::skip(1),
1191            // Select across page boundaries including final page
1192            RowSelector::select(8),
1193        ]);
1194
1195        let ranges = selection.scan_ranges(&index);
1196
1197        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1198        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1199
1200        let selection = RowSelection::from(vec![
1201            // Skip first page
1202            RowSelector::skip(10),
1203            // Multiple selects in same page
1204            RowSelector::select(3),
1205            RowSelector::skip(3),
1206            RowSelector::select(4),
1207            // Select to page boundary
1208            RowSelector::skip(5),
1209            RowSelector::select(5),
1210            // Skip full page past page boundary
1211            RowSelector::skip(12),
1212            // Select to final page boundary
1213            RowSelector::select(12),
1214            RowSelector::skip(1),
1215            // Skip across final page boundary
1216            RowSelector::skip(8),
1217            // Select from final page
1218            RowSelector::select(4),
1219        ]);
1220
1221        let ranges = selection.scan_ranges(&index);
1222
1223        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1224        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1225
1226        let selection = RowSelection::from(vec![
1227            // Skip first page
1228            RowSelector::skip(10),
1229            // Multiple selects in same page
1230            RowSelector::select(3),
1231            RowSelector::skip(3),
1232            RowSelector::select(4),
1233            // Select to remaining in page and first row of next page
1234            RowSelector::skip(5),
1235            RowSelector::select(6),
1236            // Skip remaining
1237            RowSelector::skip(50),
1238        ]);
1239
1240        let ranges = selection.scan_ranges(&index);
1241
1242        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1243        assert_eq!(ranges, vec![10..20, 20..30, 30..40]);
1244    }
1245
1246    #[test]
1247    fn test_from_ranges() {
1248        let ranges = [1..3, 4..6, 6..6, 8..8, 9..10];
1249        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10);
1250        assert_eq!(
1251            selection.selectors,
1252            vec![
1253                RowSelector::skip(1),
1254                RowSelector::select(2),
1255                RowSelector::skip(1),
1256                RowSelector::select(2),
1257                RowSelector::skip(3),
1258                RowSelector::select(1)
1259            ]
1260        );
1261
1262        let out_of_order_ranges = [1..3, 8..10, 4..7];
1263        let result = std::panic::catch_unwind(|| {
1264            RowSelection::from_consecutive_ranges(out_of_order_ranges.into_iter(), 10)
1265        });
1266        assert!(result.is_err());
1267    }
1268
1269    #[test]
1270    fn test_empty_selector() {
1271        let selection = RowSelection::from(vec![
1272            RowSelector::skip(0),
1273            RowSelector::select(2),
1274            RowSelector::skip(0),
1275            RowSelector::select(2),
1276        ]);
1277        assert_eq!(selection.selectors, vec![RowSelector::select(4)]);
1278
1279        let selection = RowSelection::from(vec![
1280            RowSelector::select(0),
1281            RowSelector::skip(2),
1282            RowSelector::select(0),
1283            RowSelector::skip(2),
1284        ]);
1285        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
1286    }
1287
1288    #[test]
1289    fn test_intersection() {
1290        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1291        let result = selection.intersection(&selection);
1292        assert_eq!(result, selection);
1293
1294        let a = RowSelection::from(vec![
1295            RowSelector::skip(10),
1296            RowSelector::select(10),
1297            RowSelector::skip(10),
1298            RowSelector::select(20),
1299        ]);
1300
1301        let b = RowSelection::from(vec![
1302            RowSelector::skip(20),
1303            RowSelector::select(20),
1304            RowSelector::skip(10),
1305        ]);
1306
1307        let result = a.intersection(&b);
1308        assert_eq!(
1309            result.selectors,
1310            vec![
1311                RowSelector::skip(30),
1312                RowSelector::select(10),
1313                RowSelector::skip(10)
1314            ]
1315        );
1316    }
1317
1318    #[test]
1319    fn test_union() {
1320        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1321        let result = selection.union(&selection);
1322        assert_eq!(result, selection);
1323
1324        // NYNYY
1325        let a = RowSelection::from(vec![
1326            RowSelector::skip(10),
1327            RowSelector::select(10),
1328            RowSelector::skip(10),
1329            RowSelector::select(20),
1330        ]);
1331
1332        // NNYYNYN
1333        let b = RowSelection::from(vec![
1334            RowSelector::skip(20),
1335            RowSelector::select(20),
1336            RowSelector::skip(10),
1337            RowSelector::select(10),
1338            RowSelector::skip(10),
1339        ]);
1340
1341        let result = a.union(&b);
1342
1343        // NYYYYYN
1344        assert_eq!(
1345            result.iter().collect::<Vec<_>>(),
1346            vec![
1347                &RowSelector::skip(10),
1348                &RowSelector::select(50),
1349                &RowSelector::skip(10),
1350            ]
1351        );
1352    }
1353
1354    #[test]
1355    fn test_row_count() {
1356        let selection = RowSelection::from(vec![
1357            RowSelector::skip(34),
1358            RowSelector::select(12),
1359            RowSelector::skip(3),
1360            RowSelector::select(35),
1361        ]);
1362
1363        assert_eq!(selection.row_count(), 12 + 35);
1364        assert_eq!(selection.skipped_row_count(), 34 + 3);
1365
1366        let selection = RowSelection::from(vec![RowSelector::select(12), RowSelector::select(35)]);
1367
1368        assert_eq!(selection.row_count(), 12 + 35);
1369        assert_eq!(selection.skipped_row_count(), 0);
1370
1371        let selection = RowSelection::from(vec![RowSelector::skip(34), RowSelector::skip(3)]);
1372
1373        assert_eq!(selection.row_count(), 0);
1374        assert_eq!(selection.skipped_row_count(), 34 + 3);
1375
1376        let selection = RowSelection::from(vec![]);
1377
1378        assert_eq!(selection.row_count(), 0);
1379        assert_eq!(selection.skipped_row_count(), 0);
1380    }
1381}