parquet/arrow/arrow_reader/
selection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_array::{Array, BooleanArray};
19use arrow_select::filter::SlicesIterator;
20use std::cmp::Ordering;
21use std::collections::VecDeque;
22use std::ops::Range;
23
24use crate::file::page_index::offset_index::PageLocation;
25
26/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
27/// scanning a parquet file
28#[derive(Debug, Clone, Copy, Eq, PartialEq)]
29pub struct RowSelector {
30    /// The number of rows
31    pub row_count: usize,
32
33    /// If true, skip `row_count` rows
34    pub skip: bool,
35}
36
37impl RowSelector {
38    /// Select `row_count` rows
39    pub fn select(row_count: usize) -> Self {
40        Self {
41            row_count,
42            skip: false,
43        }
44    }
45
46    /// Skip `row_count` rows
47    pub fn skip(row_count: usize) -> Self {
48        Self {
49            row_count,
50            skip: true,
51        }
52    }
53}
54
55/// [`RowSelection`] allows selecting or skipping a provided number of rows
56/// when scanning the parquet file.
57///
58/// This is applied prior to reading column data, and can therefore
59/// be used to skip IO to fetch data into memory
60///
61/// A typical use-case would be using the [`PageIndex`] to filter out rows
62/// that don't satisfy a predicate
63///
64/// # Example
65/// ```
66/// use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
67///
68/// let selectors = vec![
69///     RowSelector::skip(5),
70///     RowSelector::select(5),
71///     RowSelector::select(5),
72///     RowSelector::skip(5),
73/// ];
74///
75/// // Creating a selection will combine adjacent selectors
76/// let selection: RowSelection = selectors.into();
77///
78/// let expected = vec![
79///     RowSelector::skip(5),
80///     RowSelector::select(10),
81///     RowSelector::skip(5),
82/// ];
83///
84/// let actual: Vec<RowSelector> = selection.into();
85/// assert_eq!(actual, expected);
86///
87/// // you can also create a selection from consecutive ranges
88/// let ranges = vec![5..10, 10..15];
89/// let selection =
90///   RowSelection::from_consecutive_ranges(ranges.into_iter(), 20);
91/// let actual: Vec<RowSelector> = selection.into();
92/// assert_eq!(actual, expected);
93/// ```
94///
95/// A [`RowSelection`] maintains the following invariants:
96///
97/// * It contains no [`RowSelector`] of 0 rows
98/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows
99///
100/// [`PageIndex`]: crate::file::page_index::column_index::ColumnIndexMetaData
101#[derive(Debug, Clone, Default, Eq, PartialEq)]
102pub struct RowSelection {
103    selectors: Vec<RowSelector>,
104}
105
106impl RowSelection {
107    /// Creates a [`RowSelection`] from a slice of [`BooleanArray`]
108    ///
109    /// # Panic
110    ///
111    /// Panics if any of the [`BooleanArray`] contain nulls
112    pub fn from_filters(filters: &[BooleanArray]) -> Self {
113        let mut next_offset = 0;
114        let total_rows = filters.iter().map(|x| x.len()).sum();
115
116        let iter = filters.iter().flat_map(|filter| {
117            let offset = next_offset;
118            next_offset += filter.len();
119            assert_eq!(filter.null_count(), 0);
120            SlicesIterator::new(filter).map(move |(start, end)| start + offset..end + offset)
121        });
122
123        Self::from_consecutive_ranges(iter, total_rows)
124    }
125
126    /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
127    pub fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
128        ranges: I,
129        total_rows: usize,
130    ) -> Self {
131        let mut selectors: Vec<RowSelector> = Vec::with_capacity(ranges.size_hint().0);
132        let mut last_end = 0;
133        for range in ranges {
134            let len = range.end - range.start;
135            if len == 0 {
136                continue;
137            }
138
139            match range.start.cmp(&last_end) {
140                Ordering::Equal => match selectors.last_mut() {
141                    Some(last) => last.row_count = last.row_count.checked_add(len).unwrap(),
142                    None => selectors.push(RowSelector::select(len)),
143                },
144                Ordering::Greater => {
145                    selectors.push(RowSelector::skip(range.start - last_end));
146                    selectors.push(RowSelector::select(len))
147                }
148                Ordering::Less => panic!("out of order"),
149            }
150            last_end = range.end;
151        }
152
153        if last_end != total_rows {
154            selectors.push(RowSelector::skip(total_rows - last_end))
155        }
156
157        Self { selectors }
158    }
159
160    /// Given an offset index, return the byte ranges for all data pages selected by `self`
161    ///
162    /// This is useful for determining what byte ranges to fetch from underlying storage
163    ///
164    /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce
165    /// ranges that are close together. This is instead delegated to the IO subsystem to optimise,
166    /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
167    pub fn scan_ranges(&self, page_locations: &[PageLocation]) -> Vec<Range<u64>> {
168        let mut ranges: Vec<Range<u64>> = vec![];
169        let mut row_offset = 0;
170
171        let mut pages = page_locations.iter().peekable();
172        let mut selectors = self.selectors.iter().cloned();
173        let mut current_selector = selectors.next();
174        let mut current_page = pages.next();
175
176        let mut current_page_included = false;
177
178        while let Some((selector, page)) = current_selector.as_mut().zip(current_page) {
179            if !(selector.skip || current_page_included) {
180                let start = page.offset as u64;
181                let end = start + page.compressed_page_size as u64;
182                ranges.push(start..end);
183                current_page_included = true;
184            }
185
186            if let Some(next_page) = pages.peek() {
187                if row_offset + selector.row_count > next_page.first_row_index as usize {
188                    let remaining_in_page = next_page.first_row_index as usize - row_offset;
189                    selector.row_count -= remaining_in_page;
190                    row_offset += remaining_in_page;
191                    current_page = pages.next();
192                    current_page_included = false;
193
194                    continue;
195                } else {
196                    if row_offset + selector.row_count == next_page.first_row_index as usize {
197                        current_page = pages.next();
198                        current_page_included = false;
199                    }
200                    row_offset += selector.row_count;
201                    current_selector = selectors.next();
202                }
203            } else {
204                if !(selector.skip || current_page_included) {
205                    let start = page.offset as u64;
206                    let end = start + page.compressed_page_size as u64;
207                    ranges.push(start..end);
208                }
209                current_selector = selectors.next()
210            }
211        }
212
213        ranges
214    }
215
216    /// Splits off the first `row_count` from this [`RowSelection`]
217    pub fn split_off(&mut self, row_count: usize) -> Self {
218        let mut total_count = 0;
219
220        // Find the index where the selector exceeds the row count
221        let find = self.selectors.iter().position(|selector| {
222            total_count += selector.row_count;
223            total_count > row_count
224        });
225
226        let split_idx = match find {
227            Some(idx) => idx,
228            None => {
229                let selectors = std::mem::take(&mut self.selectors);
230                return Self { selectors };
231            }
232        };
233
234        let mut remaining = self.selectors.split_off(split_idx);
235
236        // Always present as `split_idx < self.selectors.len`
237        let next = remaining.first_mut().unwrap();
238        let overflow = total_count - row_count;
239
240        if next.row_count != overflow {
241            self.selectors.push(RowSelector {
242                row_count: next.row_count - overflow,
243                skip: next.skip,
244            })
245        }
246        next.row_count = overflow;
247
248        std::mem::swap(&mut remaining, &mut self.selectors);
249        Self {
250            selectors: remaining,
251        }
252    }
253    /// returns a [`RowSelection`] representing rows that are selected in both
254    /// input [`RowSelection`]s.
255    ///
256    /// This is equivalent to the logical `AND` / conjunction of the two
257    /// selections.
258    ///
259    /// # Example
260    /// If `N` means the row is not selected, and `Y` means it is
261    /// selected:
262    ///
263    /// ```text
264    /// self:     NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
265    /// other:                YYYYYNNNNYYYYYYYYYYYYY   YYNNN
266    ///
267    /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN
268    /// ```
269    ///
270    /// # Panics
271    ///
272    /// Panics if `other` does not have a length equal to the number of rows selected
273    /// by this RowSelection
274    ///
275    pub fn and_then(&self, other: &Self) -> Self {
276        let mut selectors = vec![];
277        let mut first = self.selectors.iter().cloned().peekable();
278        let mut second = other.selectors.iter().cloned().peekable();
279
280        let mut to_skip = 0;
281        while let Some(b) = second.peek_mut() {
282            let a = first
283                .peek_mut()
284                .expect("selection exceeds the number of selected rows");
285
286            if b.row_count == 0 {
287                second.next().unwrap();
288                continue;
289            }
290
291            if a.row_count == 0 {
292                first.next().unwrap();
293                continue;
294            }
295
296            if a.skip {
297                // Records were skipped when producing second
298                to_skip += a.row_count;
299                first.next().unwrap();
300                continue;
301            }
302
303            let skip = b.skip;
304            let to_process = a.row_count.min(b.row_count);
305
306            a.row_count -= to_process;
307            b.row_count -= to_process;
308
309            match skip {
310                true => to_skip += to_process,
311                false => {
312                    if to_skip != 0 {
313                        selectors.push(RowSelector::skip(to_skip));
314                        to_skip = 0;
315                    }
316                    selectors.push(RowSelector::select(to_process))
317                }
318            }
319        }
320
321        for v in first {
322            if v.row_count != 0 {
323                assert!(
324                    v.skip,
325                    "selection contains less than the number of selected rows"
326                );
327                to_skip += v.row_count
328            }
329        }
330
331        if to_skip != 0 {
332            selectors.push(RowSelector::skip(to_skip));
333        }
334
335        Self { selectors }
336    }
337
338    /// Compute the intersection of two [`RowSelection`]
339    /// For example:
340    /// self:      NNYYYYNNYYNYN
341    /// other:     NYNNNNNNY
342    ///
343    /// returned:  NNNNNNNNYYNYN
344    pub fn intersection(&self, other: &Self) -> Self {
345        intersect_row_selections(&self.selectors, &other.selectors)
346    }
347
348    /// Compute the union of two [`RowSelection`]
349    /// For example:
350    /// self:      NNYYYYNNYYNYN
351    /// other:     NYNNNNNNN
352    ///
353    /// returned:  NYYYYYNNYYNYN
354    pub fn union(&self, other: &Self) -> Self {
355        union_row_selections(&self.selectors, &other.selectors)
356    }
357
358    /// Returns `true` if this [`RowSelection`] selects any rows
359    pub fn selects_any(&self) -> bool {
360        self.selectors.iter().any(|x| !x.skip)
361    }
362
363    /// Trims this [`RowSelection`] removing any trailing skips
364    pub(crate) fn trim(mut self) -> Self {
365        while self.selectors.last().map(|x| x.skip).unwrap_or(false) {
366            self.selectors.pop();
367        }
368        self
369    }
370
371    /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows
372    pub(crate) fn offset(mut self, offset: usize) -> Self {
373        if offset == 0 {
374            return self;
375        }
376
377        let mut selected_count = 0;
378        let mut skipped_count = 0;
379
380        // Find the index where the selector exceeds the row count
381        let find = self
382            .selectors
383            .iter()
384            .position(|selector| match selector.skip {
385                true => {
386                    skipped_count += selector.row_count;
387                    false
388                }
389                false => {
390                    selected_count += selector.row_count;
391                    selected_count > offset
392                }
393            });
394
395        let split_idx = match find {
396            Some(idx) => idx,
397            None => {
398                self.selectors.clear();
399                return self;
400            }
401        };
402
403        let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1);
404        selectors.push(RowSelector::skip(skipped_count + offset));
405        selectors.push(RowSelector::select(selected_count - offset));
406        selectors.extend_from_slice(&self.selectors[split_idx + 1..]);
407
408        Self { selectors }
409    }
410
411    /// Limit this [`RowSelection`] to only select `limit` rows
412    pub(crate) fn limit(mut self, mut limit: usize) -> Self {
413        if limit == 0 {
414            self.selectors.clear();
415        }
416
417        for (idx, selection) in self.selectors.iter_mut().enumerate() {
418            if !selection.skip {
419                if selection.row_count >= limit {
420                    selection.row_count = limit;
421                    self.selectors.truncate(idx + 1);
422                    break;
423                } else {
424                    limit -= selection.row_count;
425                }
426            }
427        }
428        self
429    }
430
431    /// Returns an iterator over the [`RowSelector`]s for this
432    /// [`RowSelection`].
433    pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
434        self.selectors.iter()
435    }
436
437    /// Returns the number of selected rows
438    pub fn row_count(&self) -> usize {
439        self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum()
440    }
441
442    /// Returns the number of de-selected rows
443    pub fn skipped_row_count(&self) -> usize {
444        self.iter().filter(|s| s.skip).map(|s| s.row_count).sum()
445    }
446
447    /// Expands the selection to align with batch boundaries.
448    /// This is needed when using cached array readers to ensure that
449    /// the cached data covers full batches.
450    #[cfg(feature = "async")]
451    pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self {
452        if batch_size == 0 {
453            return self.clone();
454        }
455
456        let mut expanded_ranges = Vec::new();
457        let mut row_offset = 0;
458
459        for selector in &self.selectors {
460            if selector.skip {
461                row_offset += selector.row_count;
462            } else {
463                let start = row_offset;
464                let end = row_offset + selector.row_count;
465
466                // Expand start to batch boundary
467                let expanded_start = (start / batch_size) * batch_size;
468                // Expand end to batch boundary
469                let expanded_end = end.div_ceil(batch_size) * batch_size;
470                let expanded_end = expanded_end.min(total_rows);
471
472                expanded_ranges.push(expanded_start..expanded_end);
473                row_offset += selector.row_count;
474            }
475        }
476
477        // Sort ranges by start position
478        expanded_ranges.sort_by_key(|range| range.start);
479
480        // Merge overlapping or consecutive ranges
481        let mut merged_ranges: Vec<Range<usize>> = Vec::new();
482        for range in expanded_ranges {
483            if let Some(last) = merged_ranges.last_mut() {
484                if range.start <= last.end {
485                    // Overlapping or consecutive - merge them
486                    last.end = last.end.max(range.end);
487                } else {
488                    // No overlap - add new range
489                    merged_ranges.push(range);
490                }
491            } else {
492                // First range
493                merged_ranges.push(range);
494            }
495        }
496
497        Self::from_consecutive_ranges(merged_ranges.into_iter(), total_rows)
498    }
499}
500
501impl From<Vec<RowSelector>> for RowSelection {
502    fn from(selectors: Vec<RowSelector>) -> Self {
503        selectors.into_iter().collect()
504    }
505}
506
507impl FromIterator<RowSelector> for RowSelection {
508    fn from_iter<T: IntoIterator<Item = RowSelector>>(iter: T) -> Self {
509        let iter = iter.into_iter();
510
511        // Capacity before filter
512        let mut selectors = Vec::with_capacity(iter.size_hint().0);
513
514        let mut filtered = iter.filter(|x| x.row_count != 0);
515        if let Some(x) = filtered.next() {
516            selectors.push(x);
517        }
518
519        for s in filtered {
520            if s.row_count == 0 {
521                continue;
522            }
523
524            // Combine consecutive selectors
525            let last = selectors.last_mut().unwrap();
526            if last.skip == s.skip {
527                last.row_count = last.row_count.checked_add(s.row_count).unwrap();
528            } else {
529                selectors.push(s)
530            }
531        }
532
533        Self { selectors }
534    }
535}
536
537impl From<RowSelection> for Vec<RowSelector> {
538    fn from(r: RowSelection) -> Self {
539        r.selectors
540    }
541}
542
543impl From<RowSelection> for VecDeque<RowSelector> {
544    fn from(r: RowSelection) -> Self {
545        r.selectors.into()
546    }
547}
548
549/// Combine two lists of `RowSelection` return the intersection of them
550/// For example:
551/// self:      NNYYYYNNYYNYN
552/// other:     NYNNNNNNY
553///
554/// returned:  NNNNNNNNYYNYN
555fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
556    let mut l_iter = left.iter().copied().peekable();
557    let mut r_iter = right.iter().copied().peekable();
558
559    let iter = std::iter::from_fn(move || {
560        loop {
561            let l = l_iter.peek_mut();
562            let r = r_iter.peek_mut();
563
564            match (l, r) {
565                (Some(a), _) if a.row_count == 0 => {
566                    l_iter.next().unwrap();
567                }
568                (_, Some(b)) if b.row_count == 0 => {
569                    r_iter.next().unwrap();
570                }
571                (Some(l), Some(r)) => {
572                    return match (l.skip, r.skip) {
573                        // Keep both ranges
574                        (false, false) => {
575                            if l.row_count < r.row_count {
576                                r.row_count -= l.row_count;
577                                l_iter.next()
578                            } else {
579                                l.row_count -= r.row_count;
580                                r_iter.next()
581                            }
582                        }
583                        // skip at least one
584                        _ => {
585                            if l.row_count < r.row_count {
586                                let skip = l.row_count;
587                                r.row_count -= l.row_count;
588                                l_iter.next();
589                                Some(RowSelector::skip(skip))
590                            } else {
591                                let skip = r.row_count;
592                                l.row_count -= skip;
593                                r_iter.next();
594                                Some(RowSelector::skip(skip))
595                            }
596                        }
597                    };
598                }
599                (Some(_), None) => return l_iter.next(),
600                (None, Some(_)) => return r_iter.next(),
601                (None, None) => return None,
602            }
603        }
604    });
605
606    iter.collect()
607}
608
609/// Combine two lists of `RowSelector` return the union of them
610/// For example:
611/// self:      NNYYYYNNYYNYN
612/// other:     NYNNNNNNY
613///
614/// returned:  NYYYYYNNYYNYN
615///
616/// This can be removed from here once RowSelection::union is in parquet::arrow
617fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
618    let mut l_iter = left.iter().copied().peekable();
619    let mut r_iter = right.iter().copied().peekable();
620
621    let iter = std::iter::from_fn(move || {
622        loop {
623            let l = l_iter.peek_mut();
624            let r = r_iter.peek_mut();
625
626            match (l, r) {
627                (Some(a), _) if a.row_count == 0 => {
628                    l_iter.next().unwrap();
629                }
630                (_, Some(b)) if b.row_count == 0 => {
631                    r_iter.next().unwrap();
632                }
633                (Some(l), Some(r)) => {
634                    return match (l.skip, r.skip) {
635                        // Skip both ranges
636                        (true, true) => {
637                            if l.row_count < r.row_count {
638                                let skip = l.row_count;
639                                r.row_count -= l.row_count;
640                                l_iter.next();
641                                Some(RowSelector::skip(skip))
642                            } else {
643                                let skip = r.row_count;
644                                l.row_count -= skip;
645                                r_iter.next();
646                                Some(RowSelector::skip(skip))
647                            }
648                        }
649                        // Keep rows from left
650                        (false, true) => {
651                            if l.row_count < r.row_count {
652                                r.row_count -= l.row_count;
653                                l_iter.next()
654                            } else {
655                                let r_row_count = r.row_count;
656                                l.row_count -= r_row_count;
657                                r_iter.next();
658                                Some(RowSelector::select(r_row_count))
659                            }
660                        }
661                        // Keep rows from right
662                        (true, false) => {
663                            if l.row_count < r.row_count {
664                                let l_row_count = l.row_count;
665                                r.row_count -= l_row_count;
666                                l_iter.next();
667                                Some(RowSelector::select(l_row_count))
668                            } else {
669                                l.row_count -= r.row_count;
670                                r_iter.next()
671                            }
672                        }
673                        // Keep at least one
674                        _ => {
675                            if l.row_count < r.row_count {
676                                r.row_count -= l.row_count;
677                                l_iter.next()
678                            } else {
679                                l.row_count -= r.row_count;
680                                r_iter.next()
681                            }
682                        }
683                    };
684                }
685                (Some(_), None) => return l_iter.next(),
686                (None, Some(_)) => return r_iter.next(),
687                (None, None) => return None,
688            }
689        }
690    });
691
692    iter.collect()
693}
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698    use rand::{Rng, rng};
699
700    #[test]
701    fn test_from_filters() {
702        let filters = vec![
703            BooleanArray::from(vec![false, false, false, true, true, true, true]),
704            BooleanArray::from(vec![true, true, false, false, true, true, true]),
705            BooleanArray::from(vec![false, false, false, false]),
706            BooleanArray::from(Vec::<bool>::new()),
707        ];
708
709        let selection = RowSelection::from_filters(&filters[..1]);
710        assert!(selection.selects_any());
711        assert_eq!(
712            selection.selectors,
713            vec![RowSelector::skip(3), RowSelector::select(4)]
714        );
715
716        let selection = RowSelection::from_filters(&filters[..2]);
717        assert!(selection.selects_any());
718        assert_eq!(
719            selection.selectors,
720            vec![
721                RowSelector::skip(3),
722                RowSelector::select(6),
723                RowSelector::skip(2),
724                RowSelector::select(3)
725            ]
726        );
727
728        let selection = RowSelection::from_filters(&filters);
729        assert!(selection.selects_any());
730        assert_eq!(
731            selection.selectors,
732            vec![
733                RowSelector::skip(3),
734                RowSelector::select(6),
735                RowSelector::skip(2),
736                RowSelector::select(3),
737                RowSelector::skip(4)
738            ]
739        );
740
741        let selection = RowSelection::from_filters(&filters[2..3]);
742        assert!(!selection.selects_any());
743        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
744    }
745
746    #[test]
747    fn test_split_off() {
748        let mut selection = RowSelection::from(vec![
749            RowSelector::skip(34),
750            RowSelector::select(12),
751            RowSelector::skip(3),
752            RowSelector::select(35),
753        ]);
754
755        let split = selection.split_off(34);
756        assert_eq!(split.selectors, vec![RowSelector::skip(34)]);
757        assert_eq!(
758            selection.selectors,
759            vec![
760                RowSelector::select(12),
761                RowSelector::skip(3),
762                RowSelector::select(35)
763            ]
764        );
765
766        let split = selection.split_off(5);
767        assert_eq!(split.selectors, vec![RowSelector::select(5)]);
768        assert_eq!(
769            selection.selectors,
770            vec![
771                RowSelector::select(7),
772                RowSelector::skip(3),
773                RowSelector::select(35)
774            ]
775        );
776
777        let split = selection.split_off(8);
778        assert_eq!(
779            split.selectors,
780            vec![RowSelector::select(7), RowSelector::skip(1)]
781        );
782        assert_eq!(
783            selection.selectors,
784            vec![RowSelector::skip(2), RowSelector::select(35)]
785        );
786
787        let split = selection.split_off(200);
788        assert_eq!(
789            split.selectors,
790            vec![RowSelector::skip(2), RowSelector::select(35)]
791        );
792        assert!(selection.selectors.is_empty());
793    }
794
795    #[test]
796    fn test_offset() {
797        let selection = RowSelection::from(vec![
798            RowSelector::select(5),
799            RowSelector::skip(23),
800            RowSelector::select(7),
801            RowSelector::skip(33),
802            RowSelector::select(6),
803        ]);
804
805        let selection = selection.offset(2);
806        assert_eq!(
807            selection.selectors,
808            vec![
809                RowSelector::skip(2),
810                RowSelector::select(3),
811                RowSelector::skip(23),
812                RowSelector::select(7),
813                RowSelector::skip(33),
814                RowSelector::select(6),
815            ]
816        );
817
818        let selection = selection.offset(5);
819        assert_eq!(
820            selection.selectors,
821            vec![
822                RowSelector::skip(30),
823                RowSelector::select(5),
824                RowSelector::skip(33),
825                RowSelector::select(6),
826            ]
827        );
828
829        let selection = selection.offset(3);
830        assert_eq!(
831            selection.selectors,
832            vec![
833                RowSelector::skip(33),
834                RowSelector::select(2),
835                RowSelector::skip(33),
836                RowSelector::select(6),
837            ]
838        );
839
840        let selection = selection.offset(2);
841        assert_eq!(
842            selection.selectors,
843            vec![RowSelector::skip(68), RowSelector::select(6),]
844        );
845
846        let selection = selection.offset(3);
847        assert_eq!(
848            selection.selectors,
849            vec![RowSelector::skip(71), RowSelector::select(3),]
850        );
851    }
852
853    #[test]
854    fn test_and() {
855        let mut a = RowSelection::from(vec![
856            RowSelector::skip(12),
857            RowSelector::select(23),
858            RowSelector::skip(3),
859            RowSelector::select(5),
860        ]);
861
862        let b = RowSelection::from(vec![
863            RowSelector::select(5),
864            RowSelector::skip(4),
865            RowSelector::select(15),
866            RowSelector::skip(4),
867        ]);
868
869        let mut expected = RowSelection::from(vec![
870            RowSelector::skip(12),
871            RowSelector::select(5),
872            RowSelector::skip(4),
873            RowSelector::select(14),
874            RowSelector::skip(3),
875            RowSelector::select(1),
876            RowSelector::skip(4),
877        ]);
878
879        assert_eq!(a.and_then(&b), expected);
880
881        a.split_off(7);
882        expected.split_off(7);
883        assert_eq!(a.and_then(&b), expected);
884
885        let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]);
886
887        let b = RowSelection::from(vec![
888            RowSelector::select(2),
889            RowSelector::skip(1),
890            RowSelector::select(1),
891            RowSelector::skip(1),
892        ]);
893
894        assert_eq!(
895            a.and_then(&b).selectors,
896            vec![
897                RowSelector::select(2),
898                RowSelector::skip(1),
899                RowSelector::select(1),
900                RowSelector::skip(4)
901            ]
902        );
903    }
904
905    #[test]
906    fn test_combine() {
907        let a = vec![
908            RowSelector::skip(3),
909            RowSelector::skip(3),
910            RowSelector::select(10),
911            RowSelector::skip(4),
912        ];
913
914        let b = vec![
915            RowSelector::skip(3),
916            RowSelector::skip(3),
917            RowSelector::select(10),
918            RowSelector::skip(4),
919            RowSelector::skip(0),
920        ];
921
922        let c = vec![
923            RowSelector::skip(2),
924            RowSelector::skip(4),
925            RowSelector::select(3),
926            RowSelector::select(3),
927            RowSelector::select(4),
928            RowSelector::skip(3),
929            RowSelector::skip(1),
930            RowSelector::skip(0),
931        ];
932
933        let expected = RowSelection::from(vec![
934            RowSelector::skip(6),
935            RowSelector::select(10),
936            RowSelector::skip(4),
937        ]);
938
939        assert_eq!(RowSelection::from_iter(a), expected);
940        assert_eq!(RowSelection::from_iter(b), expected);
941        assert_eq!(RowSelection::from_iter(c), expected);
942    }
943
944    #[test]
945    fn test_combine_2elements() {
946        let a = vec![RowSelector::select(10), RowSelector::select(5)];
947        let a_expect = vec![RowSelector::select(15)];
948        assert_eq!(RowSelection::from_iter(a).selectors, a_expect);
949
950        let b = vec![RowSelector::select(10), RowSelector::skip(5)];
951        let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)];
952        assert_eq!(RowSelection::from_iter(b).selectors, b_expect);
953
954        let c = vec![RowSelector::skip(10), RowSelector::select(5)];
955        let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)];
956        assert_eq!(RowSelection::from_iter(c).selectors, c_expect);
957
958        let d = vec![RowSelector::skip(10), RowSelector::skip(5)];
959        let d_expect = vec![RowSelector::skip(15)];
960        assert_eq!(RowSelection::from_iter(d).selectors, d_expect);
961    }
962
963    #[test]
964    fn test_from_one_and_empty() {
965        let a = vec![RowSelector::select(10)];
966        let selection1 = RowSelection::from(a.clone());
967        assert_eq!(selection1.selectors, a);
968
969        let b = vec![];
970        let selection1 = RowSelection::from(b.clone());
971        assert_eq!(selection1.selectors, b)
972    }
973
974    #[test]
975    #[should_panic(expected = "selection exceeds the number of selected rows")]
976    fn test_and_longer() {
977        let a = RowSelection::from(vec![
978            RowSelector::select(3),
979            RowSelector::skip(33),
980            RowSelector::select(3),
981            RowSelector::skip(33),
982        ]);
983        let b = RowSelection::from(vec![RowSelector::select(36)]);
984        a.and_then(&b);
985    }
986
987    #[test]
988    #[should_panic(expected = "selection contains less than the number of selected rows")]
989    fn test_and_shorter() {
990        let a = RowSelection::from(vec![
991            RowSelector::select(3),
992            RowSelector::skip(33),
993            RowSelector::select(3),
994            RowSelector::skip(33),
995        ]);
996        let b = RowSelection::from(vec![RowSelector::select(3)]);
997        a.and_then(&b);
998    }
999
1000    #[test]
1001    fn test_intersect_row_selection_and_combine() {
1002        // a size equal b size
1003        let a = vec![
1004            RowSelector::select(5),
1005            RowSelector::skip(4),
1006            RowSelector::select(1),
1007        ];
1008        let b = vec![
1009            RowSelector::select(8),
1010            RowSelector::skip(1),
1011            RowSelector::select(1),
1012        ];
1013
1014        let res = intersect_row_selections(&a, &b);
1015        assert_eq!(
1016            res.selectors,
1017            vec![
1018                RowSelector::select(5),
1019                RowSelector::skip(4),
1020                RowSelector::select(1),
1021            ],
1022        );
1023
1024        // a size larger than b size
1025        let a = vec![
1026            RowSelector::select(3),
1027            RowSelector::skip(33),
1028            RowSelector::select(3),
1029            RowSelector::skip(33),
1030        ];
1031        let b = vec![RowSelector::select(36), RowSelector::skip(36)];
1032        let res = intersect_row_selections(&a, &b);
1033        assert_eq!(
1034            res.selectors,
1035            vec![RowSelector::select(3), RowSelector::skip(69)]
1036        );
1037
1038        // a size less than b size
1039        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
1040        let b = vec![
1041            RowSelector::select(2),
1042            RowSelector::skip(2),
1043            RowSelector::select(2),
1044            RowSelector::skip(2),
1045            RowSelector::select(2),
1046        ];
1047        let res = intersect_row_selections(&a, &b);
1048        assert_eq!(
1049            res.selectors,
1050            vec![RowSelector::select(2), RowSelector::skip(8)]
1051        );
1052
1053        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
1054        let b = vec![
1055            RowSelector::select(2),
1056            RowSelector::skip(2),
1057            RowSelector::select(2),
1058            RowSelector::skip(2),
1059            RowSelector::select(2),
1060        ];
1061        let res = intersect_row_selections(&a, &b);
1062        assert_eq!(
1063            res.selectors,
1064            vec![RowSelector::select(2), RowSelector::skip(8)]
1065        );
1066    }
1067
1068    #[test]
1069    fn test_and_fuzz() {
1070        let mut rand = rng();
1071        for _ in 0..100 {
1072            let a_len = rand.random_range(10..100);
1073            let a_bools: Vec<_> = (0..a_len).map(|_| rand.random_bool(0.2)).collect();
1074            let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]);
1075
1076            let b_len: usize = a_bools.iter().map(|x| *x as usize).sum();
1077            let b_bools: Vec<_> = (0..b_len).map(|_| rand.random_bool(0.8)).collect();
1078            let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]);
1079
1080            let mut expected_bools = vec![false; a_len];
1081
1082            let mut iter_b = b_bools.iter();
1083            for (idx, b) in a_bools.iter().enumerate() {
1084                if *b && *iter_b.next().unwrap() {
1085                    expected_bools[idx] = true;
1086                }
1087            }
1088
1089            let expected = RowSelection::from_filters(&[BooleanArray::from(expected_bools)]);
1090
1091            let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum();
1092            assert_eq!(a_len, total_rows);
1093
1094            assert_eq!(a.and_then(&b), expected);
1095        }
1096    }
1097
1098    #[test]
1099    fn test_iter() {
1100        // use the iter() API to show it does what is expected and
1101        // avoid accidental deletion
1102        let selectors = vec![
1103            RowSelector::select(3),
1104            RowSelector::skip(33),
1105            RowSelector::select(4),
1106        ];
1107
1108        let round_tripped = RowSelection::from(selectors.clone())
1109            .iter()
1110            .cloned()
1111            .collect::<Vec<_>>();
1112        assert_eq!(selectors, round_tripped);
1113    }
1114
1115    #[test]
1116    fn test_limit() {
1117        // Limit to existing limit should no-op
1118        let selection = RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(90)]);
1119        let limited = selection.limit(10);
1120        assert_eq!(RowSelection::from(vec![RowSelector::select(10)]), limited);
1121
1122        let selection = RowSelection::from(vec![
1123            RowSelector::select(10),
1124            RowSelector::skip(10),
1125            RowSelector::select(10),
1126            RowSelector::skip(10),
1127            RowSelector::select(10),
1128        ]);
1129
1130        let limited = selection.clone().limit(5);
1131        let expected = vec![RowSelector::select(5)];
1132        assert_eq!(limited.selectors, expected);
1133
1134        let limited = selection.clone().limit(15);
1135        let expected = vec![
1136            RowSelector::select(10),
1137            RowSelector::skip(10),
1138            RowSelector::select(5),
1139        ];
1140        assert_eq!(limited.selectors, expected);
1141
1142        let limited = selection.clone().limit(0);
1143        let expected = vec![];
1144        assert_eq!(limited.selectors, expected);
1145
1146        let limited = selection.clone().limit(30);
1147        let expected = vec![
1148            RowSelector::select(10),
1149            RowSelector::skip(10),
1150            RowSelector::select(10),
1151            RowSelector::skip(10),
1152            RowSelector::select(10),
1153        ];
1154        assert_eq!(limited.selectors, expected);
1155
1156        let limited = selection.limit(100);
1157        let expected = vec![
1158            RowSelector::select(10),
1159            RowSelector::skip(10),
1160            RowSelector::select(10),
1161            RowSelector::skip(10),
1162            RowSelector::select(10),
1163        ];
1164        assert_eq!(limited.selectors, expected);
1165    }
1166
1167    #[test]
1168    fn test_scan_ranges() {
1169        let index = vec![
1170            PageLocation {
1171                offset: 0,
1172                compressed_page_size: 10,
1173                first_row_index: 0,
1174            },
1175            PageLocation {
1176                offset: 10,
1177                compressed_page_size: 10,
1178                first_row_index: 10,
1179            },
1180            PageLocation {
1181                offset: 20,
1182                compressed_page_size: 10,
1183                first_row_index: 20,
1184            },
1185            PageLocation {
1186                offset: 30,
1187                compressed_page_size: 10,
1188                first_row_index: 30,
1189            },
1190            PageLocation {
1191                offset: 40,
1192                compressed_page_size: 10,
1193                first_row_index: 40,
1194            },
1195            PageLocation {
1196                offset: 50,
1197                compressed_page_size: 10,
1198                first_row_index: 50,
1199            },
1200            PageLocation {
1201                offset: 60,
1202                compressed_page_size: 10,
1203                first_row_index: 60,
1204            },
1205        ];
1206
1207        let selection = RowSelection::from(vec![
1208            // Skip first page
1209            RowSelector::skip(10),
1210            // Multiple selects in same page
1211            RowSelector::select(3),
1212            RowSelector::skip(3),
1213            RowSelector::select(4),
1214            // Select to page boundary
1215            RowSelector::skip(5),
1216            RowSelector::select(5),
1217            // Skip full page past page boundary
1218            RowSelector::skip(12),
1219            // Select across page boundaries
1220            RowSelector::select(12),
1221            // Skip final page
1222            RowSelector::skip(12),
1223        ]);
1224
1225        let ranges = selection.scan_ranges(&index);
1226
1227        // assert_eq!(mask, vec![false, true, true, false, true, true, false]);
1228        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);
1229
1230        let selection = RowSelection::from(vec![
1231            // Skip first page
1232            RowSelector::skip(10),
1233            // Multiple selects in same page
1234            RowSelector::select(3),
1235            RowSelector::skip(3),
1236            RowSelector::select(4),
1237            // Select to page boundary
1238            RowSelector::skip(5),
1239            RowSelector::select(5),
1240            // Skip full page past page boundary
1241            RowSelector::skip(12),
1242            // Select across page boundaries
1243            RowSelector::select(12),
1244            RowSelector::skip(1),
1245            // Select across page boundaries including final page
1246            RowSelector::select(8),
1247        ]);
1248
1249        let ranges = selection.scan_ranges(&index);
1250
1251        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1252        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1253
1254        let selection = RowSelection::from(vec![
1255            // Skip first page
1256            RowSelector::skip(10),
1257            // Multiple selects in same page
1258            RowSelector::select(3),
1259            RowSelector::skip(3),
1260            RowSelector::select(4),
1261            // Select to page boundary
1262            RowSelector::skip(5),
1263            RowSelector::select(5),
1264            // Skip full page past page boundary
1265            RowSelector::skip(12),
1266            // Select to final page boundary
1267            RowSelector::select(12),
1268            RowSelector::skip(1),
1269            // Skip across final page boundary
1270            RowSelector::skip(8),
1271            // Select from final page
1272            RowSelector::select(4),
1273        ]);
1274
1275        let ranges = selection.scan_ranges(&index);
1276
1277        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1278        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1279
1280        let selection = RowSelection::from(vec![
1281            // Skip first page
1282            RowSelector::skip(10),
1283            // Multiple selects in same page
1284            RowSelector::select(3),
1285            RowSelector::skip(3),
1286            RowSelector::select(4),
1287            // Select to remaining in page and first row of next page
1288            RowSelector::skip(5),
1289            RowSelector::select(6),
1290            // Skip remaining
1291            RowSelector::skip(50),
1292        ]);
1293
1294        let ranges = selection.scan_ranges(&index);
1295
1296        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1297        assert_eq!(ranges, vec![10..20, 20..30, 30..40]);
1298    }
1299
1300    #[test]
1301    fn test_from_ranges() {
1302        let ranges = [1..3, 4..6, 6..6, 8..8, 9..10];
1303        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10);
1304        assert_eq!(
1305            selection.selectors,
1306            vec![
1307                RowSelector::skip(1),
1308                RowSelector::select(2),
1309                RowSelector::skip(1),
1310                RowSelector::select(2),
1311                RowSelector::skip(3),
1312                RowSelector::select(1)
1313            ]
1314        );
1315
1316        let out_of_order_ranges = [1..3, 8..10, 4..7];
1317        let result = std::panic::catch_unwind(|| {
1318            RowSelection::from_consecutive_ranges(out_of_order_ranges.into_iter(), 10)
1319        });
1320        assert!(result.is_err());
1321    }
1322
1323    #[test]
1324    fn test_empty_selector() {
1325        let selection = RowSelection::from(vec![
1326            RowSelector::skip(0),
1327            RowSelector::select(2),
1328            RowSelector::skip(0),
1329            RowSelector::select(2),
1330        ]);
1331        assert_eq!(selection.selectors, vec![RowSelector::select(4)]);
1332
1333        let selection = RowSelection::from(vec![
1334            RowSelector::select(0),
1335            RowSelector::skip(2),
1336            RowSelector::select(0),
1337            RowSelector::skip(2),
1338        ]);
1339        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
1340    }
1341
1342    #[test]
1343    fn test_intersection() {
1344        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1345        let result = selection.intersection(&selection);
1346        assert_eq!(result, selection);
1347
1348        let a = RowSelection::from(vec![
1349            RowSelector::skip(10),
1350            RowSelector::select(10),
1351            RowSelector::skip(10),
1352            RowSelector::select(20),
1353        ]);
1354
1355        let b = RowSelection::from(vec![
1356            RowSelector::skip(20),
1357            RowSelector::select(20),
1358            RowSelector::skip(10),
1359        ]);
1360
1361        let result = a.intersection(&b);
1362        assert_eq!(
1363            result.selectors,
1364            vec![
1365                RowSelector::skip(30),
1366                RowSelector::select(10),
1367                RowSelector::skip(10)
1368            ]
1369        );
1370    }
1371
1372    #[test]
1373    fn test_union() {
1374        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1375        let result = selection.union(&selection);
1376        assert_eq!(result, selection);
1377
1378        // NYNYY
1379        let a = RowSelection::from(vec![
1380            RowSelector::skip(10),
1381            RowSelector::select(10),
1382            RowSelector::skip(10),
1383            RowSelector::select(20),
1384        ]);
1385
1386        // NNYYNYN
1387        let b = RowSelection::from(vec![
1388            RowSelector::skip(20),
1389            RowSelector::select(20),
1390            RowSelector::skip(10),
1391            RowSelector::select(10),
1392            RowSelector::skip(10),
1393        ]);
1394
1395        let result = a.union(&b);
1396
1397        // NYYYYYN
1398        assert_eq!(
1399            result.iter().collect::<Vec<_>>(),
1400            vec![
1401                &RowSelector::skip(10),
1402                &RowSelector::select(50),
1403                &RowSelector::skip(10),
1404            ]
1405        );
1406    }
1407
1408    #[test]
1409    fn test_row_count() {
1410        let selection = RowSelection::from(vec![
1411            RowSelector::skip(34),
1412            RowSelector::select(12),
1413            RowSelector::skip(3),
1414            RowSelector::select(35),
1415        ]);
1416
1417        assert_eq!(selection.row_count(), 12 + 35);
1418        assert_eq!(selection.skipped_row_count(), 34 + 3);
1419
1420        let selection = RowSelection::from(vec![RowSelector::select(12), RowSelector::select(35)]);
1421
1422        assert_eq!(selection.row_count(), 12 + 35);
1423        assert_eq!(selection.skipped_row_count(), 0);
1424
1425        let selection = RowSelection::from(vec![RowSelector::skip(34), RowSelector::skip(3)]);
1426
1427        assert_eq!(selection.row_count(), 0);
1428        assert_eq!(selection.skipped_row_count(), 34 + 3);
1429
1430        let selection = RowSelection::from(vec![]);
1431
1432        assert_eq!(selection.row_count(), 0);
1433        assert_eq!(selection.skipped_row_count(), 0);
1434    }
1435}