parquet/arrow/arrow_reader/
selection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_array::{Array, BooleanArray};
19use arrow_select::filter::SlicesIterator;
20use std::cmp::Ordering;
21use std::collections::VecDeque;
22use std::ops::Range;
23
24use crate::file::page_index::offset_index::PageLocation;
25
26/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
27/// scanning a parquet file
28#[derive(Debug, Clone, Copy, Eq, PartialEq)]
29pub struct RowSelector {
30    /// The number of rows
31    pub row_count: usize,
32
33    /// If true, skip `row_count` rows
34    pub skip: bool,
35}
36
37impl RowSelector {
38    /// Select `row_count` rows
39    pub fn select(row_count: usize) -> Self {
40        Self {
41            row_count,
42            skip: false,
43        }
44    }
45
46    /// Skip `row_count` rows
47    pub fn skip(row_count: usize) -> Self {
48        Self {
49            row_count,
50            skip: true,
51        }
52    }
53}
54
55/// [`RowSelection`] allows selecting or skipping a provided number of rows
56/// when scanning the parquet file.
57///
58/// This is applied prior to reading column data, and can therefore
59/// be used to skip IO to fetch data into memory
60///
61/// A typical use-case would be using the [`PageIndex`] to filter out rows
62/// that don't satisfy a predicate
63///
64/// # Example
65/// ```
66/// use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
67///
68/// let selectors = vec![
69///     RowSelector::skip(5),
70///     RowSelector::select(5),
71///     RowSelector::select(5),
72///     RowSelector::skip(5),
73/// ];
74///
75/// // Creating a selection will combine adjacent selectors
76/// let selection: RowSelection = selectors.into();
77///
78/// let expected = vec![
79///     RowSelector::skip(5),
80///     RowSelector::select(10),
81///     RowSelector::skip(5),
82/// ];
83///
84/// let actual: Vec<RowSelector> = selection.into();
85/// assert_eq!(actual, expected);
86///
87/// // you can also create a selection from consecutive ranges
88/// let ranges = vec![5..10, 10..15];
89/// let selection =
90///   RowSelection::from_consecutive_ranges(ranges.into_iter(), 20);
91/// let actual: Vec<RowSelector> = selection.into();
92/// assert_eq!(actual, expected);
93/// ```
94///
95/// A [`RowSelection`] maintains the following invariants:
96///
97/// * It contains no [`RowSelector`] of 0 rows
98/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows
99///
100/// [`PageIndex`]: crate::file::page_index::column_index::ColumnIndexMetaData
101#[derive(Debug, Clone, Default, Eq, PartialEq)]
102pub struct RowSelection {
103    selectors: Vec<RowSelector>,
104}
105
106impl RowSelection {
107    /// Creates a [`RowSelection`] from a slice of [`BooleanArray`]
108    ///
109    /// # Panic
110    ///
111    /// Panics if any of the [`BooleanArray`] contain nulls
112    pub fn from_filters(filters: &[BooleanArray]) -> Self {
113        let mut next_offset = 0;
114        let total_rows = filters.iter().map(|x| x.len()).sum();
115
116        let iter = filters.iter().flat_map(|filter| {
117            let offset = next_offset;
118            next_offset += filter.len();
119            assert_eq!(filter.null_count(), 0);
120            SlicesIterator::new(filter).map(move |(start, end)| start + offset..end + offset)
121        });
122
123        Self::from_consecutive_ranges(iter, total_rows)
124    }
125
126    /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
127    pub fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
128        ranges: I,
129        total_rows: usize,
130    ) -> Self {
131        let mut selectors: Vec<RowSelector> = Vec::with_capacity(ranges.size_hint().0);
132        let mut last_end = 0;
133        for range in ranges {
134            let len = range.end - range.start;
135            if len == 0 {
136                continue;
137            }
138
139            match range.start.cmp(&last_end) {
140                Ordering::Equal => match selectors.last_mut() {
141                    Some(last) => last.row_count = last.row_count.checked_add(len).unwrap(),
142                    None => selectors.push(RowSelector::select(len)),
143                },
144                Ordering::Greater => {
145                    selectors.push(RowSelector::skip(range.start - last_end));
146                    selectors.push(RowSelector::select(len))
147                }
148                Ordering::Less => panic!("out of order"),
149            }
150            last_end = range.end;
151        }
152
153        if last_end != total_rows {
154            selectors.push(RowSelector::skip(total_rows - last_end))
155        }
156
157        Self { selectors }
158    }
159
160    /// Given an offset index, return the byte ranges for all data pages selected by `self`
161    ///
162    /// This is useful for determining what byte ranges to fetch from underlying storage
163    ///
164    /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce
165    /// ranges that are close together. This is instead delegated to the IO subsystem to optimise,
166    /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
167    pub fn scan_ranges(&self, page_locations: &[PageLocation]) -> Vec<Range<u64>> {
168        let mut ranges: Vec<Range<u64>> = vec![];
169        let mut row_offset = 0;
170
171        let mut pages = page_locations.iter().peekable();
172        let mut selectors = self.selectors.iter().cloned();
173        let mut current_selector = selectors.next();
174        let mut current_page = pages.next();
175
176        let mut current_page_included = false;
177
178        while let Some((selector, page)) = current_selector.as_mut().zip(current_page) {
179            if !(selector.skip || current_page_included) {
180                let start = page.offset as u64;
181                let end = start + page.compressed_page_size as u64;
182                ranges.push(start..end);
183                current_page_included = true;
184            }
185
186            if let Some(next_page) = pages.peek() {
187                if row_offset + selector.row_count > next_page.first_row_index as usize {
188                    let remaining_in_page = next_page.first_row_index as usize - row_offset;
189                    selector.row_count -= remaining_in_page;
190                    row_offset += remaining_in_page;
191                    current_page = pages.next();
192                    current_page_included = false;
193
194                    continue;
195                } else {
196                    if row_offset + selector.row_count == next_page.first_row_index as usize {
197                        current_page = pages.next();
198                        current_page_included = false;
199                    }
200                    row_offset += selector.row_count;
201                    current_selector = selectors.next();
202                }
203            } else {
204                if !(selector.skip || current_page_included) {
205                    let start = page.offset as u64;
206                    let end = start + page.compressed_page_size as u64;
207                    ranges.push(start..end);
208                }
209                current_selector = selectors.next()
210            }
211        }
212
213        ranges
214    }
215
216    /// Splits off the first `row_count` from this [`RowSelection`]
217    pub fn split_off(&mut self, row_count: usize) -> Self {
218        let mut total_count = 0;
219
220        // Find the index where the selector exceeds the row count
221        let find = self.selectors.iter().position(|selector| {
222            total_count += selector.row_count;
223            total_count > row_count
224        });
225
226        let split_idx = match find {
227            Some(idx) => idx,
228            None => {
229                let selectors = std::mem::take(&mut self.selectors);
230                return Self { selectors };
231            }
232        };
233
234        let mut remaining = self.selectors.split_off(split_idx);
235
236        // Always present as `split_idx < self.selectors.len`
237        let next = remaining.first_mut().unwrap();
238        let overflow = total_count - row_count;
239
240        if next.row_count != overflow {
241            self.selectors.push(RowSelector {
242                row_count: next.row_count - overflow,
243                skip: next.skip,
244            })
245        }
246        next.row_count = overflow;
247
248        std::mem::swap(&mut remaining, &mut self.selectors);
249        Self {
250            selectors: remaining,
251        }
252    }
253    /// returns a [`RowSelection`] representing rows that are selected in both
254    /// input [`RowSelection`]s.
255    ///
256    /// This is equivalent to the logical `AND` / conjunction of the two
257    /// selections.
258    ///
259    /// # Example
260    /// If `N` means the row is not selected, and `Y` means it is
261    /// selected:
262    ///
263    /// ```text
264    /// self:     NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
265    /// other:                YYYYYNNNNYYYYYYYYYYYYY   YYNNN
266    ///
267    /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN
268    /// ```
269    ///
270    /// # Panics
271    ///
272    /// Panics if `other` does not have a length equal to the number of rows selected
273    /// by this RowSelection
274    ///
275    pub fn and_then(&self, other: &Self) -> Self {
276        let mut selectors = vec![];
277        let mut first = self.selectors.iter().cloned().peekable();
278        let mut second = other.selectors.iter().cloned().peekable();
279
280        let mut to_skip = 0;
281        while let Some(b) = second.peek_mut() {
282            let a = first
283                .peek_mut()
284                .expect("selection exceeds the number of selected rows");
285
286            if b.row_count == 0 {
287                second.next().unwrap();
288                continue;
289            }
290
291            if a.row_count == 0 {
292                first.next().unwrap();
293                continue;
294            }
295
296            if a.skip {
297                // Records were skipped when producing second
298                to_skip += a.row_count;
299                first.next().unwrap();
300                continue;
301            }
302
303            let skip = b.skip;
304            let to_process = a.row_count.min(b.row_count);
305
306            a.row_count -= to_process;
307            b.row_count -= to_process;
308
309            match skip {
310                true => to_skip += to_process,
311                false => {
312                    if to_skip != 0 {
313                        selectors.push(RowSelector::skip(to_skip));
314                        to_skip = 0;
315                    }
316                    selectors.push(RowSelector::select(to_process))
317                }
318            }
319        }
320
321        for v in first {
322            if v.row_count != 0 {
323                assert!(
324                    v.skip,
325                    "selection contains less than the number of selected rows"
326                );
327                to_skip += v.row_count
328            }
329        }
330
331        if to_skip != 0 {
332            selectors.push(RowSelector::skip(to_skip));
333        }
334
335        Self { selectors }
336    }
337
338    /// Compute the intersection of two [`RowSelection`]
339    /// For example:
340    /// self:      NNYYYYNNYYNYN
341    /// other:     NYNNNNNNY
342    ///
343    /// returned:  NNNNNNNNYYNYN
344    pub fn intersection(&self, other: &Self) -> Self {
345        intersect_row_selections(&self.selectors, &other.selectors)
346    }
347
348    /// Compute the union of two [`RowSelection`]
349    /// For example:
350    /// self:      NNYYYYNNYYNYN
351    /// other:     NYNNNNNNN
352    ///
353    /// returned:  NYYYYYNNYYNYN
354    pub fn union(&self, other: &Self) -> Self {
355        union_row_selections(&self.selectors, &other.selectors)
356    }
357
358    /// Returns `true` if this [`RowSelection`] selects any rows
359    pub fn selects_any(&self) -> bool {
360        self.selectors.iter().any(|x| !x.skip)
361    }
362
363    /// Trims this [`RowSelection`] removing any trailing skips
364    pub(crate) fn trim(mut self) -> Self {
365        while self.selectors.last().map(|x| x.skip).unwrap_or(false) {
366            self.selectors.pop();
367        }
368        self
369    }
370
371    /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows
372    pub(crate) fn offset(mut self, offset: usize) -> Self {
373        if offset == 0 {
374            return self;
375        }
376
377        let mut selected_count = 0;
378        let mut skipped_count = 0;
379
380        // Find the index where the selector exceeds the row count
381        let find = self
382            .selectors
383            .iter()
384            .position(|selector| match selector.skip {
385                true => {
386                    skipped_count += selector.row_count;
387                    false
388                }
389                false => {
390                    selected_count += selector.row_count;
391                    selected_count > offset
392                }
393            });
394
395        let split_idx = match find {
396            Some(idx) => idx,
397            None => {
398                self.selectors.clear();
399                return self;
400            }
401        };
402
403        let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1);
404        selectors.push(RowSelector::skip(skipped_count + offset));
405        selectors.push(RowSelector::select(selected_count - offset));
406        selectors.extend_from_slice(&self.selectors[split_idx + 1..]);
407
408        Self { selectors }
409    }
410
411    /// Limit this [`RowSelection`] to only select `limit` rows
412    pub(crate) fn limit(mut self, mut limit: usize) -> Self {
413        if limit == 0 {
414            self.selectors.clear();
415        }
416
417        for (idx, selection) in self.selectors.iter_mut().enumerate() {
418            if !selection.skip {
419                if selection.row_count >= limit {
420                    selection.row_count = limit;
421                    self.selectors.truncate(idx + 1);
422                    break;
423                } else {
424                    limit -= selection.row_count;
425                }
426            }
427        }
428        self
429    }
430
431    /// Returns an iterator over the [`RowSelector`]s for this
432    /// [`RowSelection`].
433    pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
434        self.selectors.iter()
435    }
436
437    /// Returns the number of selected rows
438    pub fn row_count(&self) -> usize {
439        self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum()
440    }
441
442    /// Returns the number of de-selected rows
443    pub fn skipped_row_count(&self) -> usize {
444        self.iter().filter(|s| s.skip).map(|s| s.row_count).sum()
445    }
446
447    /// Expands the selection to align with batch boundaries.
448    /// This is needed when using cached array readers to ensure that
449    /// the cached data covers full batches.
450    pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self {
451        if batch_size == 0 {
452            return self.clone();
453        }
454
455        let mut expanded_ranges = Vec::new();
456        let mut row_offset = 0;
457
458        for selector in &self.selectors {
459            if selector.skip {
460                row_offset += selector.row_count;
461            } else {
462                let start = row_offset;
463                let end = row_offset + selector.row_count;
464
465                // Expand start to batch boundary
466                let expanded_start = (start / batch_size) * batch_size;
467                // Expand end to batch boundary
468                let expanded_end = end.div_ceil(batch_size) * batch_size;
469                let expanded_end = expanded_end.min(total_rows);
470
471                expanded_ranges.push(expanded_start..expanded_end);
472                row_offset += selector.row_count;
473            }
474        }
475
476        // Sort ranges by start position
477        expanded_ranges.sort_by_key(|range| range.start);
478
479        // Merge overlapping or consecutive ranges
480        let mut merged_ranges: Vec<Range<usize>> = Vec::new();
481        for range in expanded_ranges {
482            if let Some(last) = merged_ranges.last_mut() {
483                if range.start <= last.end {
484                    // Overlapping or consecutive - merge them
485                    last.end = last.end.max(range.end);
486                } else {
487                    // No overlap - add new range
488                    merged_ranges.push(range);
489                }
490            } else {
491                // First range
492                merged_ranges.push(range);
493            }
494        }
495
496        Self::from_consecutive_ranges(merged_ranges.into_iter(), total_rows)
497    }
498}
499
500impl From<Vec<RowSelector>> for RowSelection {
501    fn from(selectors: Vec<RowSelector>) -> Self {
502        selectors.into_iter().collect()
503    }
504}
505
506impl FromIterator<RowSelector> for RowSelection {
507    fn from_iter<T: IntoIterator<Item = RowSelector>>(iter: T) -> Self {
508        let iter = iter.into_iter();
509
510        // Capacity before filter
511        let mut selectors = Vec::with_capacity(iter.size_hint().0);
512
513        let mut filtered = iter.filter(|x| x.row_count != 0);
514        if let Some(x) = filtered.next() {
515            selectors.push(x);
516        }
517
518        for s in filtered {
519            if s.row_count == 0 {
520                continue;
521            }
522
523            // Combine consecutive selectors
524            let last = selectors.last_mut().unwrap();
525            if last.skip == s.skip {
526                last.row_count = last.row_count.checked_add(s.row_count).unwrap();
527            } else {
528                selectors.push(s)
529            }
530        }
531
532        Self { selectors }
533    }
534}
535
536impl From<RowSelection> for Vec<RowSelector> {
537    fn from(r: RowSelection) -> Self {
538        r.selectors
539    }
540}
541
542impl From<RowSelection> for VecDeque<RowSelector> {
543    fn from(r: RowSelection) -> Self {
544        r.selectors.into()
545    }
546}
547
548/// Combine two lists of `RowSelection` return the intersection of them
549/// For example:
550/// self:      NNYYYYNNYYNYN
551/// other:     NYNNNNNNY
552///
553/// returned:  NNNNNNNNYYNYN
554fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
555    let mut l_iter = left.iter().copied().peekable();
556    let mut r_iter = right.iter().copied().peekable();
557
558    let iter = std::iter::from_fn(move || {
559        loop {
560            let l = l_iter.peek_mut();
561            let r = r_iter.peek_mut();
562
563            match (l, r) {
564                (Some(a), _) if a.row_count == 0 => {
565                    l_iter.next().unwrap();
566                }
567                (_, Some(b)) if b.row_count == 0 => {
568                    r_iter.next().unwrap();
569                }
570                (Some(l), Some(r)) => {
571                    return match (l.skip, r.skip) {
572                        // Keep both ranges
573                        (false, false) => {
574                            if l.row_count < r.row_count {
575                                r.row_count -= l.row_count;
576                                l_iter.next()
577                            } else {
578                                l.row_count -= r.row_count;
579                                r_iter.next()
580                            }
581                        }
582                        // skip at least one
583                        _ => {
584                            if l.row_count < r.row_count {
585                                let skip = l.row_count;
586                                r.row_count -= l.row_count;
587                                l_iter.next();
588                                Some(RowSelector::skip(skip))
589                            } else {
590                                let skip = r.row_count;
591                                l.row_count -= skip;
592                                r_iter.next();
593                                Some(RowSelector::skip(skip))
594                            }
595                        }
596                    };
597                }
598                (Some(_), None) => return l_iter.next(),
599                (None, Some(_)) => return r_iter.next(),
600                (None, None) => return None,
601            }
602        }
603    });
604
605    iter.collect()
606}
607
608/// Combine two lists of `RowSelector` return the union of them
609/// For example:
610/// self:      NNYYYYNNYYNYN
611/// other:     NYNNNNNNY
612///
613/// returned:  NYYYYYNNYYNYN
614///
615/// This can be removed from here once RowSelection::union is in parquet::arrow
616fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
617    let mut l_iter = left.iter().copied().peekable();
618    let mut r_iter = right.iter().copied().peekable();
619
620    let iter = std::iter::from_fn(move || {
621        loop {
622            let l = l_iter.peek_mut();
623            let r = r_iter.peek_mut();
624
625            match (l, r) {
626                (Some(a), _) if a.row_count == 0 => {
627                    l_iter.next().unwrap();
628                }
629                (_, Some(b)) if b.row_count == 0 => {
630                    r_iter.next().unwrap();
631                }
632                (Some(l), Some(r)) => {
633                    return match (l.skip, r.skip) {
634                        // Skip both ranges
635                        (true, true) => {
636                            if l.row_count < r.row_count {
637                                let skip = l.row_count;
638                                r.row_count -= l.row_count;
639                                l_iter.next();
640                                Some(RowSelector::skip(skip))
641                            } else {
642                                let skip = r.row_count;
643                                l.row_count -= skip;
644                                r_iter.next();
645                                Some(RowSelector::skip(skip))
646                            }
647                        }
648                        // Keep rows from left
649                        (false, true) => {
650                            if l.row_count < r.row_count {
651                                r.row_count -= l.row_count;
652                                l_iter.next()
653                            } else {
654                                let r_row_count = r.row_count;
655                                l.row_count -= r_row_count;
656                                r_iter.next();
657                                Some(RowSelector::select(r_row_count))
658                            }
659                        }
660                        // Keep rows from right
661                        (true, false) => {
662                            if l.row_count < r.row_count {
663                                let l_row_count = l.row_count;
664                                r.row_count -= l_row_count;
665                                l_iter.next();
666                                Some(RowSelector::select(l_row_count))
667                            } else {
668                                l.row_count -= r.row_count;
669                                r_iter.next()
670                            }
671                        }
672                        // Keep at least one
673                        _ => {
674                            if l.row_count < r.row_count {
675                                r.row_count -= l.row_count;
676                                l_iter.next()
677                            } else {
678                                l.row_count -= r.row_count;
679                                r_iter.next()
680                            }
681                        }
682                    };
683                }
684                (Some(_), None) => return l_iter.next(),
685                (None, Some(_)) => return r_iter.next(),
686                (None, None) => return None,
687            }
688        }
689    });
690
691    iter.collect()
692}
693
694#[cfg(test)]
695mod tests {
696    use super::*;
697    use rand::{Rng, rng};
698
699    #[test]
700    fn test_from_filters() {
701        let filters = vec![
702            BooleanArray::from(vec![false, false, false, true, true, true, true]),
703            BooleanArray::from(vec![true, true, false, false, true, true, true]),
704            BooleanArray::from(vec![false, false, false, false]),
705            BooleanArray::from(Vec::<bool>::new()),
706        ];
707
708        let selection = RowSelection::from_filters(&filters[..1]);
709        assert!(selection.selects_any());
710        assert_eq!(
711            selection.selectors,
712            vec![RowSelector::skip(3), RowSelector::select(4)]
713        );
714
715        let selection = RowSelection::from_filters(&filters[..2]);
716        assert!(selection.selects_any());
717        assert_eq!(
718            selection.selectors,
719            vec![
720                RowSelector::skip(3),
721                RowSelector::select(6),
722                RowSelector::skip(2),
723                RowSelector::select(3)
724            ]
725        );
726
727        let selection = RowSelection::from_filters(&filters);
728        assert!(selection.selects_any());
729        assert_eq!(
730            selection.selectors,
731            vec![
732                RowSelector::skip(3),
733                RowSelector::select(6),
734                RowSelector::skip(2),
735                RowSelector::select(3),
736                RowSelector::skip(4)
737            ]
738        );
739
740        let selection = RowSelection::from_filters(&filters[2..3]);
741        assert!(!selection.selects_any());
742        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
743    }
744
745    #[test]
746    fn test_split_off() {
747        let mut selection = RowSelection::from(vec![
748            RowSelector::skip(34),
749            RowSelector::select(12),
750            RowSelector::skip(3),
751            RowSelector::select(35),
752        ]);
753
754        let split = selection.split_off(34);
755        assert_eq!(split.selectors, vec![RowSelector::skip(34)]);
756        assert_eq!(
757            selection.selectors,
758            vec![
759                RowSelector::select(12),
760                RowSelector::skip(3),
761                RowSelector::select(35)
762            ]
763        );
764
765        let split = selection.split_off(5);
766        assert_eq!(split.selectors, vec![RowSelector::select(5)]);
767        assert_eq!(
768            selection.selectors,
769            vec![
770                RowSelector::select(7),
771                RowSelector::skip(3),
772                RowSelector::select(35)
773            ]
774        );
775
776        let split = selection.split_off(8);
777        assert_eq!(
778            split.selectors,
779            vec![RowSelector::select(7), RowSelector::skip(1)]
780        );
781        assert_eq!(
782            selection.selectors,
783            vec![RowSelector::skip(2), RowSelector::select(35)]
784        );
785
786        let split = selection.split_off(200);
787        assert_eq!(
788            split.selectors,
789            vec![RowSelector::skip(2), RowSelector::select(35)]
790        );
791        assert!(selection.selectors.is_empty());
792    }
793
794    #[test]
795    fn test_offset() {
796        let selection = RowSelection::from(vec![
797            RowSelector::select(5),
798            RowSelector::skip(23),
799            RowSelector::select(7),
800            RowSelector::skip(33),
801            RowSelector::select(6),
802        ]);
803
804        let selection = selection.offset(2);
805        assert_eq!(
806            selection.selectors,
807            vec![
808                RowSelector::skip(2),
809                RowSelector::select(3),
810                RowSelector::skip(23),
811                RowSelector::select(7),
812                RowSelector::skip(33),
813                RowSelector::select(6),
814            ]
815        );
816
817        let selection = selection.offset(5);
818        assert_eq!(
819            selection.selectors,
820            vec![
821                RowSelector::skip(30),
822                RowSelector::select(5),
823                RowSelector::skip(33),
824                RowSelector::select(6),
825            ]
826        );
827
828        let selection = selection.offset(3);
829        assert_eq!(
830            selection.selectors,
831            vec![
832                RowSelector::skip(33),
833                RowSelector::select(2),
834                RowSelector::skip(33),
835                RowSelector::select(6),
836            ]
837        );
838
839        let selection = selection.offset(2);
840        assert_eq!(
841            selection.selectors,
842            vec![RowSelector::skip(68), RowSelector::select(6),]
843        );
844
845        let selection = selection.offset(3);
846        assert_eq!(
847            selection.selectors,
848            vec![RowSelector::skip(71), RowSelector::select(3),]
849        );
850    }
851
852    #[test]
853    fn test_and() {
854        let mut a = RowSelection::from(vec![
855            RowSelector::skip(12),
856            RowSelector::select(23),
857            RowSelector::skip(3),
858            RowSelector::select(5),
859        ]);
860
861        let b = RowSelection::from(vec![
862            RowSelector::select(5),
863            RowSelector::skip(4),
864            RowSelector::select(15),
865            RowSelector::skip(4),
866        ]);
867
868        let mut expected = RowSelection::from(vec![
869            RowSelector::skip(12),
870            RowSelector::select(5),
871            RowSelector::skip(4),
872            RowSelector::select(14),
873            RowSelector::skip(3),
874            RowSelector::select(1),
875            RowSelector::skip(4),
876        ]);
877
878        assert_eq!(a.and_then(&b), expected);
879
880        a.split_off(7);
881        expected.split_off(7);
882        assert_eq!(a.and_then(&b), expected);
883
884        let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]);
885
886        let b = RowSelection::from(vec![
887            RowSelector::select(2),
888            RowSelector::skip(1),
889            RowSelector::select(1),
890            RowSelector::skip(1),
891        ]);
892
893        assert_eq!(
894            a.and_then(&b).selectors,
895            vec![
896                RowSelector::select(2),
897                RowSelector::skip(1),
898                RowSelector::select(1),
899                RowSelector::skip(4)
900            ]
901        );
902    }
903
904    #[test]
905    fn test_combine() {
906        let a = vec![
907            RowSelector::skip(3),
908            RowSelector::skip(3),
909            RowSelector::select(10),
910            RowSelector::skip(4),
911        ];
912
913        let b = vec![
914            RowSelector::skip(3),
915            RowSelector::skip(3),
916            RowSelector::select(10),
917            RowSelector::skip(4),
918            RowSelector::skip(0),
919        ];
920
921        let c = vec![
922            RowSelector::skip(2),
923            RowSelector::skip(4),
924            RowSelector::select(3),
925            RowSelector::select(3),
926            RowSelector::select(4),
927            RowSelector::skip(3),
928            RowSelector::skip(1),
929            RowSelector::skip(0),
930        ];
931
932        let expected = RowSelection::from(vec![
933            RowSelector::skip(6),
934            RowSelector::select(10),
935            RowSelector::skip(4),
936        ]);
937
938        assert_eq!(RowSelection::from_iter(a), expected);
939        assert_eq!(RowSelection::from_iter(b), expected);
940        assert_eq!(RowSelection::from_iter(c), expected);
941    }
942
943    #[test]
944    fn test_combine_2elements() {
945        let a = vec![RowSelector::select(10), RowSelector::select(5)];
946        let a_expect = vec![RowSelector::select(15)];
947        assert_eq!(RowSelection::from_iter(a).selectors, a_expect);
948
949        let b = vec![RowSelector::select(10), RowSelector::skip(5)];
950        let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)];
951        assert_eq!(RowSelection::from_iter(b).selectors, b_expect);
952
953        let c = vec![RowSelector::skip(10), RowSelector::select(5)];
954        let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)];
955        assert_eq!(RowSelection::from_iter(c).selectors, c_expect);
956
957        let d = vec![RowSelector::skip(10), RowSelector::skip(5)];
958        let d_expect = vec![RowSelector::skip(15)];
959        assert_eq!(RowSelection::from_iter(d).selectors, d_expect);
960    }
961
962    #[test]
963    fn test_from_one_and_empty() {
964        let a = vec![RowSelector::select(10)];
965        let selection1 = RowSelection::from(a.clone());
966        assert_eq!(selection1.selectors, a);
967
968        let b = vec![];
969        let selection1 = RowSelection::from(b.clone());
970        assert_eq!(selection1.selectors, b)
971    }
972
973    #[test]
974    #[should_panic(expected = "selection exceeds the number of selected rows")]
975    fn test_and_longer() {
976        let a = RowSelection::from(vec![
977            RowSelector::select(3),
978            RowSelector::skip(33),
979            RowSelector::select(3),
980            RowSelector::skip(33),
981        ]);
982        let b = RowSelection::from(vec![RowSelector::select(36)]);
983        a.and_then(&b);
984    }
985
986    #[test]
987    #[should_panic(expected = "selection contains less than the number of selected rows")]
988    fn test_and_shorter() {
989        let a = RowSelection::from(vec![
990            RowSelector::select(3),
991            RowSelector::skip(33),
992            RowSelector::select(3),
993            RowSelector::skip(33),
994        ]);
995        let b = RowSelection::from(vec![RowSelector::select(3)]);
996        a.and_then(&b);
997    }
998
999    #[test]
1000    fn test_intersect_row_selection_and_combine() {
1001        // a size equal b size
1002        let a = vec![
1003            RowSelector::select(5),
1004            RowSelector::skip(4),
1005            RowSelector::select(1),
1006        ];
1007        let b = vec![
1008            RowSelector::select(8),
1009            RowSelector::skip(1),
1010            RowSelector::select(1),
1011        ];
1012
1013        let res = intersect_row_selections(&a, &b);
1014        assert_eq!(
1015            res.selectors,
1016            vec![
1017                RowSelector::select(5),
1018                RowSelector::skip(4),
1019                RowSelector::select(1),
1020            ],
1021        );
1022
1023        // a size larger than b size
1024        let a = vec![
1025            RowSelector::select(3),
1026            RowSelector::skip(33),
1027            RowSelector::select(3),
1028            RowSelector::skip(33),
1029        ];
1030        let b = vec![RowSelector::select(36), RowSelector::skip(36)];
1031        let res = intersect_row_selections(&a, &b);
1032        assert_eq!(
1033            res.selectors,
1034            vec![RowSelector::select(3), RowSelector::skip(69)]
1035        );
1036
1037        // a size less than b size
1038        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
1039        let b = vec![
1040            RowSelector::select(2),
1041            RowSelector::skip(2),
1042            RowSelector::select(2),
1043            RowSelector::skip(2),
1044            RowSelector::select(2),
1045        ];
1046        let res = intersect_row_selections(&a, &b);
1047        assert_eq!(
1048            res.selectors,
1049            vec![RowSelector::select(2), RowSelector::skip(8)]
1050        );
1051
1052        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
1053        let b = vec![
1054            RowSelector::select(2),
1055            RowSelector::skip(2),
1056            RowSelector::select(2),
1057            RowSelector::skip(2),
1058            RowSelector::select(2),
1059        ];
1060        let res = intersect_row_selections(&a, &b);
1061        assert_eq!(
1062            res.selectors,
1063            vec![RowSelector::select(2), RowSelector::skip(8)]
1064        );
1065    }
1066
1067    #[test]
1068    fn test_and_fuzz() {
1069        let mut rand = rng();
1070        for _ in 0..100 {
1071            let a_len = rand.random_range(10..100);
1072            let a_bools: Vec<_> = (0..a_len).map(|_| rand.random_bool(0.2)).collect();
1073            let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]);
1074
1075            let b_len: usize = a_bools.iter().map(|x| *x as usize).sum();
1076            let b_bools: Vec<_> = (0..b_len).map(|_| rand.random_bool(0.8)).collect();
1077            let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]);
1078
1079            let mut expected_bools = vec![false; a_len];
1080
1081            let mut iter_b = b_bools.iter();
1082            for (idx, b) in a_bools.iter().enumerate() {
1083                if *b && *iter_b.next().unwrap() {
1084                    expected_bools[idx] = true;
1085                }
1086            }
1087
1088            let expected = RowSelection::from_filters(&[BooleanArray::from(expected_bools)]);
1089
1090            let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum();
1091            assert_eq!(a_len, total_rows);
1092
1093            assert_eq!(a.and_then(&b), expected);
1094        }
1095    }
1096
1097    #[test]
1098    fn test_iter() {
1099        // use the iter() API to show it does what is expected and
1100        // avoid accidental deletion
1101        let selectors = vec![
1102            RowSelector::select(3),
1103            RowSelector::skip(33),
1104            RowSelector::select(4),
1105        ];
1106
1107        let round_tripped = RowSelection::from(selectors.clone())
1108            .iter()
1109            .cloned()
1110            .collect::<Vec<_>>();
1111        assert_eq!(selectors, round_tripped);
1112    }
1113
1114    #[test]
1115    fn test_limit() {
1116        // Limit to existing limit should no-op
1117        let selection = RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(90)]);
1118        let limited = selection.limit(10);
1119        assert_eq!(RowSelection::from(vec![RowSelector::select(10)]), limited);
1120
1121        let selection = RowSelection::from(vec![
1122            RowSelector::select(10),
1123            RowSelector::skip(10),
1124            RowSelector::select(10),
1125            RowSelector::skip(10),
1126            RowSelector::select(10),
1127        ]);
1128
1129        let limited = selection.clone().limit(5);
1130        let expected = vec![RowSelector::select(5)];
1131        assert_eq!(limited.selectors, expected);
1132
1133        let limited = selection.clone().limit(15);
1134        let expected = vec![
1135            RowSelector::select(10),
1136            RowSelector::skip(10),
1137            RowSelector::select(5),
1138        ];
1139        assert_eq!(limited.selectors, expected);
1140
1141        let limited = selection.clone().limit(0);
1142        let expected = vec![];
1143        assert_eq!(limited.selectors, expected);
1144
1145        let limited = selection.clone().limit(30);
1146        let expected = vec![
1147            RowSelector::select(10),
1148            RowSelector::skip(10),
1149            RowSelector::select(10),
1150            RowSelector::skip(10),
1151            RowSelector::select(10),
1152        ];
1153        assert_eq!(limited.selectors, expected);
1154
1155        let limited = selection.limit(100);
1156        let expected = vec![
1157            RowSelector::select(10),
1158            RowSelector::skip(10),
1159            RowSelector::select(10),
1160            RowSelector::skip(10),
1161            RowSelector::select(10),
1162        ];
1163        assert_eq!(limited.selectors, expected);
1164    }
1165
1166    #[test]
1167    fn test_scan_ranges() {
1168        let index = vec![
1169            PageLocation {
1170                offset: 0,
1171                compressed_page_size: 10,
1172                first_row_index: 0,
1173            },
1174            PageLocation {
1175                offset: 10,
1176                compressed_page_size: 10,
1177                first_row_index: 10,
1178            },
1179            PageLocation {
1180                offset: 20,
1181                compressed_page_size: 10,
1182                first_row_index: 20,
1183            },
1184            PageLocation {
1185                offset: 30,
1186                compressed_page_size: 10,
1187                first_row_index: 30,
1188            },
1189            PageLocation {
1190                offset: 40,
1191                compressed_page_size: 10,
1192                first_row_index: 40,
1193            },
1194            PageLocation {
1195                offset: 50,
1196                compressed_page_size: 10,
1197                first_row_index: 50,
1198            },
1199            PageLocation {
1200                offset: 60,
1201                compressed_page_size: 10,
1202                first_row_index: 60,
1203            },
1204        ];
1205
1206        let selection = RowSelection::from(vec![
1207            // Skip first page
1208            RowSelector::skip(10),
1209            // Multiple selects in same page
1210            RowSelector::select(3),
1211            RowSelector::skip(3),
1212            RowSelector::select(4),
1213            // Select to page boundary
1214            RowSelector::skip(5),
1215            RowSelector::select(5),
1216            // Skip full page past page boundary
1217            RowSelector::skip(12),
1218            // Select across page boundaries
1219            RowSelector::select(12),
1220            // Skip final page
1221            RowSelector::skip(12),
1222        ]);
1223
1224        let ranges = selection.scan_ranges(&index);
1225
1226        // assert_eq!(mask, vec![false, true, true, false, true, true, false]);
1227        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);
1228
1229        let selection = RowSelection::from(vec![
1230            // Skip first page
1231            RowSelector::skip(10),
1232            // Multiple selects in same page
1233            RowSelector::select(3),
1234            RowSelector::skip(3),
1235            RowSelector::select(4),
1236            // Select to page boundary
1237            RowSelector::skip(5),
1238            RowSelector::select(5),
1239            // Skip full page past page boundary
1240            RowSelector::skip(12),
1241            // Select across page boundaries
1242            RowSelector::select(12),
1243            RowSelector::skip(1),
1244            // Select across page boundaries including final page
1245            RowSelector::select(8),
1246        ]);
1247
1248        let ranges = selection.scan_ranges(&index);
1249
1250        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1251        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1252
1253        let selection = RowSelection::from(vec![
1254            // Skip first page
1255            RowSelector::skip(10),
1256            // Multiple selects in same page
1257            RowSelector::select(3),
1258            RowSelector::skip(3),
1259            RowSelector::select(4),
1260            // Select to page boundary
1261            RowSelector::skip(5),
1262            RowSelector::select(5),
1263            // Skip full page past page boundary
1264            RowSelector::skip(12),
1265            // Select to final page boundary
1266            RowSelector::select(12),
1267            RowSelector::skip(1),
1268            // Skip across final page boundary
1269            RowSelector::skip(8),
1270            // Select from final page
1271            RowSelector::select(4),
1272        ]);
1273
1274        let ranges = selection.scan_ranges(&index);
1275
1276        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1277        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1278
1279        let selection = RowSelection::from(vec![
1280            // Skip first page
1281            RowSelector::skip(10),
1282            // Multiple selects in same page
1283            RowSelector::select(3),
1284            RowSelector::skip(3),
1285            RowSelector::select(4),
1286            // Select to remaining in page and first row of next page
1287            RowSelector::skip(5),
1288            RowSelector::select(6),
1289            // Skip remaining
1290            RowSelector::skip(50),
1291        ]);
1292
1293        let ranges = selection.scan_ranges(&index);
1294
1295        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1296        assert_eq!(ranges, vec![10..20, 20..30, 30..40]);
1297    }
1298
1299    #[test]
1300    fn test_from_ranges() {
1301        let ranges = [1..3, 4..6, 6..6, 8..8, 9..10];
1302        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10);
1303        assert_eq!(
1304            selection.selectors,
1305            vec![
1306                RowSelector::skip(1),
1307                RowSelector::select(2),
1308                RowSelector::skip(1),
1309                RowSelector::select(2),
1310                RowSelector::skip(3),
1311                RowSelector::select(1)
1312            ]
1313        );
1314
1315        let out_of_order_ranges = [1..3, 8..10, 4..7];
1316        let result = std::panic::catch_unwind(|| {
1317            RowSelection::from_consecutive_ranges(out_of_order_ranges.into_iter(), 10)
1318        });
1319        assert!(result.is_err());
1320    }
1321
1322    #[test]
1323    fn test_empty_selector() {
1324        let selection = RowSelection::from(vec![
1325            RowSelector::skip(0),
1326            RowSelector::select(2),
1327            RowSelector::skip(0),
1328            RowSelector::select(2),
1329        ]);
1330        assert_eq!(selection.selectors, vec![RowSelector::select(4)]);
1331
1332        let selection = RowSelection::from(vec![
1333            RowSelector::select(0),
1334            RowSelector::skip(2),
1335            RowSelector::select(0),
1336            RowSelector::skip(2),
1337        ]);
1338        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
1339    }
1340
1341    #[test]
1342    fn test_intersection() {
1343        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1344        let result = selection.intersection(&selection);
1345        assert_eq!(result, selection);
1346
1347        let a = RowSelection::from(vec![
1348            RowSelector::skip(10),
1349            RowSelector::select(10),
1350            RowSelector::skip(10),
1351            RowSelector::select(20),
1352        ]);
1353
1354        let b = RowSelection::from(vec![
1355            RowSelector::skip(20),
1356            RowSelector::select(20),
1357            RowSelector::skip(10),
1358        ]);
1359
1360        let result = a.intersection(&b);
1361        assert_eq!(
1362            result.selectors,
1363            vec![
1364                RowSelector::skip(30),
1365                RowSelector::select(10),
1366                RowSelector::skip(10)
1367            ]
1368        );
1369    }
1370
1371    #[test]
1372    fn test_union() {
1373        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1374        let result = selection.union(&selection);
1375        assert_eq!(result, selection);
1376
1377        // NYNYY
1378        let a = RowSelection::from(vec![
1379            RowSelector::skip(10),
1380            RowSelector::select(10),
1381            RowSelector::skip(10),
1382            RowSelector::select(20),
1383        ]);
1384
1385        // NNYYNYN
1386        let b = RowSelection::from(vec![
1387            RowSelector::skip(20),
1388            RowSelector::select(20),
1389            RowSelector::skip(10),
1390            RowSelector::select(10),
1391            RowSelector::skip(10),
1392        ]);
1393
1394        let result = a.union(&b);
1395
1396        // NYYYYYN
1397        assert_eq!(
1398            result.iter().collect::<Vec<_>>(),
1399            vec![
1400                &RowSelector::skip(10),
1401                &RowSelector::select(50),
1402                &RowSelector::skip(10),
1403            ]
1404        );
1405    }
1406
1407    #[test]
1408    fn test_row_count() {
1409        let selection = RowSelection::from(vec![
1410            RowSelector::skip(34),
1411            RowSelector::select(12),
1412            RowSelector::skip(3),
1413            RowSelector::select(35),
1414        ]);
1415
1416        assert_eq!(selection.row_count(), 12 + 35);
1417        assert_eq!(selection.skipped_row_count(), 34 + 3);
1418
1419        let selection = RowSelection::from(vec![RowSelector::select(12), RowSelector::select(35)]);
1420
1421        assert_eq!(selection.row_count(), 12 + 35);
1422        assert_eq!(selection.skipped_row_count(), 0);
1423
1424        let selection = RowSelection::from(vec![RowSelector::skip(34), RowSelector::skip(3)]);
1425
1426        assert_eq!(selection.row_count(), 0);
1427        assert_eq!(selection.skipped_row_count(), 34 + 3);
1428
1429        let selection = RowSelection::from(vec![]);
1430
1431        assert_eq!(selection.row_count(), 0);
1432        assert_eq!(selection.skipped_row_count(), 0);
1433    }
1434
1435    #[test]
1436    fn test_trim() {
1437        let selection = RowSelection::from(vec![
1438            RowSelector::skip(34),
1439            RowSelector::select(12),
1440            RowSelector::skip(3),
1441            RowSelector::select(35),
1442        ]);
1443
1444        let expected = vec![
1445            RowSelector::skip(34),
1446            RowSelector::select(12),
1447            RowSelector::skip(3),
1448            RowSelector::select(35),
1449        ];
1450
1451        assert_eq!(selection.trim().selectors, expected);
1452
1453        let selection = RowSelection::from(vec![
1454            RowSelector::skip(34),
1455            RowSelector::select(12),
1456            RowSelector::skip(3),
1457        ]);
1458
1459        let expected = vec![RowSelector::skip(34), RowSelector::select(12)];
1460
1461        assert_eq!(selection.trim().selectors, expected);
1462    }
1463}