parquet/arrow/arrow_reader/
selection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::ProjectionMask;
19use crate::errors::ParquetError;
20use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
21use arrow_array::{Array, BooleanArray};
22use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
23use arrow_select::filter::SlicesIterator;
24use std::cmp::Ordering;
25use std::collections::VecDeque;
26use std::ops::Range;
27
28/// Policy for picking a strategy to materialise [`RowSelection`] during execution.
29///
30/// Note that this is a user-provided preference, and the actual strategy used
31/// may differ based on safety considerations (e.g. page skipping).
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
33pub enum RowSelectionPolicy {
34    /// Use a queue of [`RowSelector`] values
35    Selectors,
36    /// Use a boolean mask to materialise the selection
37    Mask,
38    /// Choose between [`Self::Mask`] and [`Self::Selectors`] based on selector density
39    Auto {
40        /// Average selector length below which masks are preferred
41        threshold: usize,
42    },
43}
44
45impl Default for RowSelectionPolicy {
46    fn default() -> Self {
47        Self::Auto { threshold: 32 }
48    }
49}
50
51/// Fully resolved strategy for materializing [`RowSelection`] during execution.
52///
53/// This is determined from a combination of user preference (via [`RowSelectionPolicy`])
54/// and safety considerations (e.g. page skipping).
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub(crate) enum RowSelectionStrategy {
57    /// Use a queue of [`RowSelector`] values
58    Selectors,
59    /// Use a boolean mask to materialise the selection
60    Mask,
61}
62
63/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
64/// scanning a parquet file
65#[derive(Debug, Clone, Copy, Eq, PartialEq)]
66pub struct RowSelector {
67    /// The number of rows
68    pub row_count: usize,
69
70    /// If true, skip `row_count` rows
71    pub skip: bool,
72}
73
74impl RowSelector {
75    /// Select `row_count` rows
76    pub fn select(row_count: usize) -> Self {
77        Self {
78            row_count,
79            skip: false,
80        }
81    }
82
83    /// Skip `row_count` rows
84    pub fn skip(row_count: usize) -> Self {
85        Self {
86            row_count,
87            skip: true,
88        }
89    }
90}
91
92/// [`RowSelection`] allows selecting or skipping a provided number of rows
93/// when scanning the parquet file.
94///
95/// This is applied prior to reading column data, and can therefore
96/// be used to skip IO to fetch data into memory
97///
98/// A typical use-case would be using the [`PageIndex`] to filter out rows
99/// that don't satisfy a predicate
100///
101/// # Example
102/// ```
103/// use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
104///
105/// let selectors = vec![
106///     RowSelector::skip(5),
107///     RowSelector::select(5),
108///     RowSelector::select(5),
109///     RowSelector::skip(5),
110/// ];
111///
112/// // Creating a selection will combine adjacent selectors
113/// let selection: RowSelection = selectors.into();
114///
115/// let expected = vec![
116///     RowSelector::skip(5),
117///     RowSelector::select(10),
118///     RowSelector::skip(5),
119/// ];
120///
121/// let actual: Vec<RowSelector> = selection.into();
122/// assert_eq!(actual, expected);
123///
124/// // you can also create a selection from consecutive ranges
125/// let ranges = vec![5..10, 10..15];
126/// let selection =
127///   RowSelection::from_consecutive_ranges(ranges.into_iter(), 20);
128/// let actual: Vec<RowSelector> = selection.into();
129/// assert_eq!(actual, expected);
130/// ```
131///
132/// A [`RowSelection`] maintains the following invariants:
133///
134/// * It contains no [`RowSelector`] of 0 rows
135/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows
136///
137/// [`PageIndex`]: crate::file::page_index::column_index::ColumnIndexMetaData
138#[derive(Debug, Clone, Default, Eq, PartialEq)]
139pub struct RowSelection {
140    selectors: Vec<RowSelector>,
141}
142
143impl RowSelection {
144    /// Creates a [`RowSelection`] from a slice of [`BooleanArray`]
145    ///
146    /// # Panic
147    ///
148    /// Panics if any of the [`BooleanArray`] contain nulls
149    pub fn from_filters(filters: &[BooleanArray]) -> Self {
150        let mut next_offset = 0;
151        let total_rows = filters.iter().map(|x| x.len()).sum();
152
153        let iter = filters.iter().flat_map(|filter| {
154            let offset = next_offset;
155            next_offset += filter.len();
156            assert_eq!(filter.null_count(), 0);
157            SlicesIterator::new(filter).map(move |(start, end)| start + offset..end + offset)
158        });
159
160        Self::from_consecutive_ranges(iter, total_rows)
161    }
162
163    /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
164    pub fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
165        ranges: I,
166        total_rows: usize,
167    ) -> Self {
168        let mut selectors: Vec<RowSelector> = Vec::with_capacity(ranges.size_hint().0);
169        let mut last_end = 0;
170        for range in ranges {
171            let len = range.end - range.start;
172            if len == 0 {
173                continue;
174            }
175
176            match range.start.cmp(&last_end) {
177                Ordering::Equal => match selectors.last_mut() {
178                    Some(last) => last.row_count = last.row_count.checked_add(len).unwrap(),
179                    None => selectors.push(RowSelector::select(len)),
180                },
181                Ordering::Greater => {
182                    selectors.push(RowSelector::skip(range.start - last_end));
183                    selectors.push(RowSelector::select(len))
184                }
185                Ordering::Less => panic!("out of order"),
186            }
187            last_end = range.end;
188        }
189
190        if last_end != total_rows {
191            selectors.push(RowSelector::skip(total_rows - last_end))
192        }
193
194        Self { selectors }
195    }
196
197    /// Given an offset index, return the byte ranges for all data pages selected by `self`
198    ///
199    /// This is useful for determining what byte ranges to fetch from underlying storage
200    ///
201    /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce
202    /// ranges that are close together. This is instead delegated to the IO subsystem to optimise,
203    /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
204    pub fn scan_ranges(&self, page_locations: &[PageLocation]) -> Vec<Range<u64>> {
205        let mut ranges: Vec<Range<u64>> = vec![];
206        let mut row_offset = 0;
207
208        let mut pages = page_locations.iter().peekable();
209        let mut selectors = self.selectors.iter().cloned();
210        let mut current_selector = selectors.next();
211        let mut current_page = pages.next();
212
213        let mut current_page_included = false;
214
215        while let Some((selector, page)) = current_selector.as_mut().zip(current_page) {
216            if !(selector.skip || current_page_included) {
217                let start = page.offset as u64;
218                let end = start + page.compressed_page_size as u64;
219                ranges.push(start..end);
220                current_page_included = true;
221            }
222
223            if let Some(next_page) = pages.peek() {
224                if row_offset + selector.row_count > next_page.first_row_index as usize {
225                    let remaining_in_page = next_page.first_row_index as usize - row_offset;
226                    selector.row_count -= remaining_in_page;
227                    row_offset += remaining_in_page;
228                    current_page = pages.next();
229                    current_page_included = false;
230
231                    continue;
232                } else {
233                    if row_offset + selector.row_count == next_page.first_row_index as usize {
234                        current_page = pages.next();
235                        current_page_included = false;
236                    }
237                    row_offset += selector.row_count;
238                    current_selector = selectors.next();
239                }
240            } else {
241                if !(selector.skip || current_page_included) {
242                    let start = page.offset as u64;
243                    let end = start + page.compressed_page_size as u64;
244                    ranges.push(start..end);
245                }
246                current_selector = selectors.next()
247            }
248        }
249
250        ranges
251    }
252
253    /// Returns true if this selection would skip any data pages within the provided columns
254    fn selection_skips_any_page(
255        &self,
256        projection: &ProjectionMask,
257        columns: &[OffsetIndexMetaData],
258    ) -> bool {
259        columns.iter().enumerate().any(|(leaf_idx, column)| {
260            if !projection.leaf_included(leaf_idx) {
261                return false;
262            }
263
264            let locations = column.page_locations();
265            if locations.is_empty() {
266                return false;
267            }
268
269            let ranges = self.scan_ranges(locations);
270            !ranges.is_empty() && ranges.len() < locations.len()
271        })
272    }
273
274    /// Returns true if selectors should be forced, preventing mask materialisation
275    pub(crate) fn should_force_selectors(
276        &self,
277        projection: &ProjectionMask,
278        offset_index: Option<&[OffsetIndexMetaData]>,
279    ) -> bool {
280        match offset_index {
281            Some(columns) => self.selection_skips_any_page(projection, columns),
282            None => false,
283        }
284    }
285
286    /// Splits off the first `row_count` from this [`RowSelection`]
287    pub fn split_off(&mut self, row_count: usize) -> Self {
288        let mut total_count = 0;
289
290        // Find the index where the selector exceeds the row count
291        let find = self.selectors.iter().position(|selector| {
292            total_count += selector.row_count;
293            total_count > row_count
294        });
295
296        let split_idx = match find {
297            Some(idx) => idx,
298            None => {
299                let selectors = std::mem::take(&mut self.selectors);
300                return Self { selectors };
301            }
302        };
303
304        let mut remaining = self.selectors.split_off(split_idx);
305
306        // Always present as `split_idx < self.selectors.len`
307        let next = remaining.first_mut().unwrap();
308        let overflow = total_count - row_count;
309
310        if next.row_count != overflow {
311            self.selectors.push(RowSelector {
312                row_count: next.row_count - overflow,
313                skip: next.skip,
314            })
315        }
316        next.row_count = overflow;
317
318        std::mem::swap(&mut remaining, &mut self.selectors);
319        Self {
320            selectors: remaining,
321        }
322    }
323    /// returns a [`RowSelection`] representing rows that are selected in both
324    /// input [`RowSelection`]s.
325    ///
326    /// This is equivalent to the logical `AND` / conjunction of the two
327    /// selections.
328    ///
329    /// # Example
330    /// If `N` means the row is not selected, and `Y` means it is
331    /// selected:
332    ///
333    /// ```text
334    /// self:     NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
335    /// other:                YYYYYNNNNYYYYYYYYYYYYY   YYNNN
336    ///
337    /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN
338    /// ```
339    ///
340    /// # Panics
341    ///
342    /// Panics if `other` does not have a length equal to the number of rows selected
343    /// by this RowSelection
344    ///
345    pub fn and_then(&self, other: &Self) -> Self {
346        let mut selectors = vec![];
347        let mut first = self.selectors.iter().cloned().peekable();
348        let mut second = other.selectors.iter().cloned().peekable();
349
350        let mut to_skip = 0;
351        while let Some(b) = second.peek_mut() {
352            let a = first
353                .peek_mut()
354                .expect("selection exceeds the number of selected rows");
355
356            if b.row_count == 0 {
357                second.next().unwrap();
358                continue;
359            }
360
361            if a.row_count == 0 {
362                first.next().unwrap();
363                continue;
364            }
365
366            if a.skip {
367                // Records were skipped when producing second
368                to_skip += a.row_count;
369                first.next().unwrap();
370                continue;
371            }
372
373            let skip = b.skip;
374            let to_process = a.row_count.min(b.row_count);
375
376            a.row_count -= to_process;
377            b.row_count -= to_process;
378
379            match skip {
380                true => to_skip += to_process,
381                false => {
382                    if to_skip != 0 {
383                        selectors.push(RowSelector::skip(to_skip));
384                        to_skip = 0;
385                    }
386                    selectors.push(RowSelector::select(to_process))
387                }
388            }
389        }
390
391        for v in first {
392            if v.row_count != 0 {
393                assert!(
394                    v.skip,
395                    "selection contains less than the number of selected rows"
396                );
397                to_skip += v.row_count
398            }
399        }
400
401        if to_skip != 0 {
402            selectors.push(RowSelector::skip(to_skip));
403        }
404
405        Self { selectors }
406    }
407
408    /// Compute the intersection of two [`RowSelection`]
409    /// For example:
410    /// self:      NNYYYYNNYYNYN
411    /// other:     NYNNNNNNY
412    ///
413    /// returned:  NNNNNNNNYYNYN
414    pub fn intersection(&self, other: &Self) -> Self {
415        intersect_row_selections(&self.selectors, &other.selectors)
416    }
417
418    /// Compute the union of two [`RowSelection`]
419    /// For example:
420    /// self:      NNYYYYNNYYNYN
421    /// other:     NYNNNNNNN
422    ///
423    /// returned:  NYYYYYNNYYNYN
424    pub fn union(&self, other: &Self) -> Self {
425        union_row_selections(&self.selectors, &other.selectors)
426    }
427
428    /// Returns `true` if this [`RowSelection`] selects any rows
429    pub fn selects_any(&self) -> bool {
430        self.selectors.iter().any(|x| !x.skip)
431    }
432
433    /// Trims this [`RowSelection`] removing any trailing skips
434    pub(crate) fn trim(mut self) -> Self {
435        while self.selectors.last().map(|x| x.skip).unwrap_or(false) {
436            self.selectors.pop();
437        }
438        self
439    }
440
441    /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows
442    pub(crate) fn offset(mut self, offset: usize) -> Self {
443        if offset == 0 {
444            return self;
445        }
446
447        let mut selected_count = 0;
448        let mut skipped_count = 0;
449
450        // Find the index where the selector exceeds the row count
451        let find = self
452            .selectors
453            .iter()
454            .position(|selector| match selector.skip {
455                true => {
456                    skipped_count += selector.row_count;
457                    false
458                }
459                false => {
460                    selected_count += selector.row_count;
461                    selected_count > offset
462                }
463            });
464
465        let split_idx = match find {
466            Some(idx) => idx,
467            None => {
468                self.selectors.clear();
469                return self;
470            }
471        };
472
473        let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1);
474        selectors.push(RowSelector::skip(skipped_count + offset));
475        selectors.push(RowSelector::select(selected_count - offset));
476        selectors.extend_from_slice(&self.selectors[split_idx + 1..]);
477
478        Self { selectors }
479    }
480
481    /// Limit this [`RowSelection`] to only select `limit` rows
482    pub(crate) fn limit(mut self, mut limit: usize) -> Self {
483        if limit == 0 {
484            self.selectors.clear();
485        }
486
487        for (idx, selection) in self.selectors.iter_mut().enumerate() {
488            if !selection.skip {
489                if selection.row_count >= limit {
490                    selection.row_count = limit;
491                    self.selectors.truncate(idx + 1);
492                    break;
493                } else {
494                    limit -= selection.row_count;
495                }
496            }
497        }
498        self
499    }
500
501    /// Returns an iterator over the [`RowSelector`]s for this
502    /// [`RowSelection`].
503    pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
504        self.selectors.iter()
505    }
506
507    /// Returns the number of selected rows
508    pub fn row_count(&self) -> usize {
509        self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum()
510    }
511
512    /// Returns the number of de-selected rows
513    pub fn skipped_row_count(&self) -> usize {
514        self.iter().filter(|s| s.skip).map(|s| s.row_count).sum()
515    }
516
517    /// Expands the selection to align with batch boundaries.
518    /// This is needed when using cached array readers to ensure that
519    /// the cached data covers full batches.
520    pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self {
521        if batch_size == 0 {
522            return self.clone();
523        }
524
525        let mut expanded_ranges = Vec::new();
526        let mut row_offset = 0;
527
528        for selector in &self.selectors {
529            if selector.skip {
530                row_offset += selector.row_count;
531            } else {
532                let start = row_offset;
533                let end = row_offset + selector.row_count;
534
535                // Expand start to batch boundary
536                let expanded_start = (start / batch_size) * batch_size;
537                // Expand end to batch boundary
538                let expanded_end = end.div_ceil(batch_size) * batch_size;
539                let expanded_end = expanded_end.min(total_rows);
540
541                expanded_ranges.push(expanded_start..expanded_end);
542                row_offset += selector.row_count;
543            }
544        }
545
546        // Sort ranges by start position
547        expanded_ranges.sort_by_key(|range| range.start);
548
549        // Merge overlapping or consecutive ranges
550        let mut merged_ranges: Vec<Range<usize>> = Vec::new();
551        for range in expanded_ranges {
552            if let Some(last) = merged_ranges.last_mut() {
553                if range.start <= last.end {
554                    // Overlapping or consecutive - merge them
555                    last.end = last.end.max(range.end);
556                } else {
557                    // No overlap - add new range
558                    merged_ranges.push(range);
559                }
560            } else {
561                // First range
562                merged_ranges.push(range);
563            }
564        }
565
566        Self::from_consecutive_ranges(merged_ranges.into_iter(), total_rows)
567    }
568}
569
570impl From<Vec<RowSelector>> for RowSelection {
571    fn from(selectors: Vec<RowSelector>) -> Self {
572        selectors.into_iter().collect()
573    }
574}
575
576impl FromIterator<RowSelector> for RowSelection {
577    fn from_iter<T: IntoIterator<Item = RowSelector>>(iter: T) -> Self {
578        let iter = iter.into_iter();
579
580        // Capacity before filter
581        let mut selectors = Vec::with_capacity(iter.size_hint().0);
582
583        let mut filtered = iter.filter(|x| x.row_count != 0);
584        if let Some(x) = filtered.next() {
585            selectors.push(x);
586        }
587
588        for s in filtered {
589            if s.row_count == 0 {
590                continue;
591            }
592
593            // Combine consecutive selectors
594            let last = selectors.last_mut().unwrap();
595            if last.skip == s.skip {
596                last.row_count = last.row_count.checked_add(s.row_count).unwrap();
597            } else {
598                selectors.push(s)
599            }
600        }
601
602        Self { selectors }
603    }
604}
605
606impl From<RowSelection> for Vec<RowSelector> {
607    fn from(r: RowSelection) -> Self {
608        r.selectors
609    }
610}
611
612impl From<RowSelection> for VecDeque<RowSelector> {
613    fn from(r: RowSelection) -> Self {
614        r.selectors.into()
615    }
616}
617
618/// Combine two lists of `RowSelection` return the intersection of them
619/// For example:
620/// self:      NNYYYYNNYYNYN
621/// other:     NYNNNNNNY
622///
623/// returned:  NNNNNNNNYYNYN
624fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
625    let mut l_iter = left.iter().copied().peekable();
626    let mut r_iter = right.iter().copied().peekable();
627
628    let iter = std::iter::from_fn(move || {
629        loop {
630            let l = l_iter.peek_mut();
631            let r = r_iter.peek_mut();
632
633            match (l, r) {
634                (Some(a), _) if a.row_count == 0 => {
635                    l_iter.next().unwrap();
636                }
637                (_, Some(b)) if b.row_count == 0 => {
638                    r_iter.next().unwrap();
639                }
640                (Some(l), Some(r)) => {
641                    return match (l.skip, r.skip) {
642                        // Keep both ranges
643                        (false, false) => {
644                            if l.row_count < r.row_count {
645                                r.row_count -= l.row_count;
646                                l_iter.next()
647                            } else {
648                                l.row_count -= r.row_count;
649                                r_iter.next()
650                            }
651                        }
652                        // skip at least one
653                        _ => {
654                            if l.row_count < r.row_count {
655                                let skip = l.row_count;
656                                r.row_count -= l.row_count;
657                                l_iter.next();
658                                Some(RowSelector::skip(skip))
659                            } else {
660                                let skip = r.row_count;
661                                l.row_count -= skip;
662                                r_iter.next();
663                                Some(RowSelector::skip(skip))
664                            }
665                        }
666                    };
667                }
668                (Some(_), None) => return l_iter.next(),
669                (None, Some(_)) => return r_iter.next(),
670                (None, None) => return None,
671            }
672        }
673    });
674
675    iter.collect()
676}
677
678/// Combine two lists of `RowSelector` return the union of them
679/// For example:
680/// self:      NNYYYYNNYYNYN
681/// other:     NYNNNNNNY
682///
683/// returned:  NYYYYYNNYYNYN
684///
685/// This can be removed from here once RowSelection::union is in parquet::arrow
686fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
687    let mut l_iter = left.iter().copied().peekable();
688    let mut r_iter = right.iter().copied().peekable();
689
690    let iter = std::iter::from_fn(move || {
691        loop {
692            let l = l_iter.peek_mut();
693            let r = r_iter.peek_mut();
694
695            match (l, r) {
696                (Some(a), _) if a.row_count == 0 => {
697                    l_iter.next().unwrap();
698                }
699                (_, Some(b)) if b.row_count == 0 => {
700                    r_iter.next().unwrap();
701                }
702                (Some(l), Some(r)) => {
703                    return match (l.skip, r.skip) {
704                        // Skip both ranges
705                        (true, true) => {
706                            if l.row_count < r.row_count {
707                                let skip = l.row_count;
708                                r.row_count -= l.row_count;
709                                l_iter.next();
710                                Some(RowSelector::skip(skip))
711                            } else {
712                                let skip = r.row_count;
713                                l.row_count -= skip;
714                                r_iter.next();
715                                Some(RowSelector::skip(skip))
716                            }
717                        }
718                        // Keep rows from left
719                        (false, true) => {
720                            if l.row_count < r.row_count {
721                                r.row_count -= l.row_count;
722                                l_iter.next()
723                            } else {
724                                let r_row_count = r.row_count;
725                                l.row_count -= r_row_count;
726                                r_iter.next();
727                                Some(RowSelector::select(r_row_count))
728                            }
729                        }
730                        // Keep rows from right
731                        (true, false) => {
732                            if l.row_count < r.row_count {
733                                let l_row_count = l.row_count;
734                                r.row_count -= l_row_count;
735                                l_iter.next();
736                                Some(RowSelector::select(l_row_count))
737                            } else {
738                                l.row_count -= r.row_count;
739                                r_iter.next()
740                            }
741                        }
742                        // Keep at least one
743                        _ => {
744                            if l.row_count < r.row_count {
745                                r.row_count -= l.row_count;
746                                l_iter.next()
747                            } else {
748                                l.row_count -= r.row_count;
749                                r_iter.next()
750                            }
751                        }
752                    };
753                }
754                (Some(_), None) => return l_iter.next(),
755                (None, Some(_)) => return r_iter.next(),
756                (None, None) => return None,
757            }
758        }
759    });
760
761    iter.collect()
762}
763
764/// Cursor for iterating a mask-backed [`RowSelection`]
765///
766/// This is best for dense selections where there are many small skips
767/// or selections. For example, selecting every other row.
768#[derive(Debug)]
769pub struct MaskCursor {
770    mask: BooleanBuffer,
771    /// Current absolute offset into the selection
772    position: usize,
773}
774
775impl MaskCursor {
776    /// Returns `true` when no further rows remain
777    pub fn is_empty(&self) -> bool {
778        self.position >= self.mask.len()
779    }
780
781    /// Advance through the mask representation, producing the next chunk summary
782    pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option<MaskChunk> {
783        let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = {
784            let mask = &self.mask;
785
786            if self.position >= mask.len() {
787                return None;
788            }
789
790            let start_position = self.position;
791            let mut cursor = start_position;
792            let mut initial_skip = 0;
793
794            while cursor < mask.len() && !mask.value(cursor) {
795                initial_skip += 1;
796                cursor += 1;
797            }
798
799            let mask_start = cursor;
800            let mut chunk_rows = 0;
801            let mut selected_rows = 0;
802
803            // Advance until enough rows have been selected to satisfy the batch size,
804            // or until the mask is exhausted. This mirrors the behaviour of the legacy
805            // `RowSelector` queue-based iteration.
806            while cursor < mask.len() && selected_rows < batch_size {
807                chunk_rows += 1;
808                if mask.value(cursor) {
809                    selected_rows += 1;
810                }
811                cursor += 1;
812            }
813
814            (initial_skip, chunk_rows, selected_rows, mask_start, cursor)
815        };
816
817        self.position = end_position;
818
819        Some(MaskChunk {
820            initial_skip,
821            chunk_rows,
822            selected_rows,
823            mask_start,
824        })
825    }
826
827    /// Materialise the boolean values for a mask-backed chunk
828    pub fn mask_values_for(&self, chunk: &MaskChunk) -> Result<BooleanArray, ParquetError> {
829        if chunk.mask_start.saturating_add(chunk.chunk_rows) > self.mask.len() {
830            return Err(ParquetError::General(
831                "Internal Error: MaskChunk exceeds mask length".to_string(),
832            ));
833        }
834        Ok(BooleanArray::from(
835            self.mask.slice(chunk.mask_start, chunk.chunk_rows),
836        ))
837    }
838}
839
840/// Cursor for iterating a selector-backed [`RowSelection`]
841///
842/// This is best for sparse selections where large contiguous
843/// blocks of rows are selected or skipped.
844#[derive(Debug)]
845pub struct SelectorsCursor {
846    selectors: VecDeque<RowSelector>,
847    /// Current absolute offset into the selection
848    position: usize,
849}
850
851impl SelectorsCursor {
852    /// Returns `true` when no further rows remain
853    pub fn is_empty(&self) -> bool {
854        self.selectors.is_empty()
855    }
856
857    pub(crate) fn selectors_mut(&mut self) -> &mut VecDeque<RowSelector> {
858        &mut self.selectors
859    }
860
861    /// Return the next [`RowSelector`]
862    pub(crate) fn next_selector(&mut self) -> RowSelector {
863        let selector = self.selectors.pop_front().unwrap();
864        self.position += selector.row_count;
865        selector
866    }
867
868    /// Return a selector to the front, rewinding the position
869    pub(crate) fn return_selector(&mut self, selector: RowSelector) {
870        self.position = self.position.saturating_sub(selector.row_count);
871        self.selectors.push_front(selector);
872    }
873}
874
875/// Result of computing the next chunk to read when using a [`MaskCursor`]
876#[derive(Debug)]
877pub struct MaskChunk {
878    /// Number of leading rows to skip before reaching selected rows
879    pub initial_skip: usize,
880    /// Total rows covered by this chunk (selected + skipped)
881    pub chunk_rows: usize,
882    /// Rows actually selected within the chunk
883    pub selected_rows: usize,
884    /// Starting offset within the mask where the chunk begins
885    pub mask_start: usize,
886}
887
888/// Cursor for iterating a [`RowSelection`] during execution within a
889/// [`ReadPlan`](crate::arrow::arrow_reader::ReadPlan).
890///
891/// This keeps per-reader state such as the current position and delegates the
892/// actual storage strategy to the internal `RowSelectionBacking`.
893#[derive(Debug)]
894pub enum RowSelectionCursor {
895    /// Reading all rows
896    All,
897    /// Use a bitmask to back the selection (dense selections)
898    Mask(MaskCursor),
899    /// Use a queue of selectors to back the selection (sparse selections)
900    Selectors(SelectorsCursor),
901}
902
903impl RowSelectionCursor {
904    /// Create a [`MaskCursor`] cursor backed by a bitmask, from an existing set of selectors
905    pub(crate) fn new_mask_from_selectors(selectors: Vec<RowSelector>) -> Self {
906        Self::Mask(MaskCursor {
907            mask: boolean_mask_from_selectors(&selectors),
908            position: 0,
909        })
910    }
911
912    /// Create a [`RowSelectionCursor::Selectors`] from the provided selectors
913    pub(crate) fn new_selectors(selectors: Vec<RowSelector>) -> Self {
914        Self::Selectors(SelectorsCursor {
915            selectors: selectors.into(),
916            position: 0,
917        })
918    }
919
920    /// Create a cursor that selects all rows
921    pub(crate) fn new_all() -> Self {
922        Self::All
923    }
924}
925
926fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer {
927    let total_rows: usize = selectors.iter().map(|s| s.row_count).sum();
928    let mut builder = BooleanBufferBuilder::new(total_rows);
929    for selector in selectors {
930        builder.append_n(selector.row_count, !selector.skip);
931    }
932    builder.finish()
933}
934
935#[cfg(test)]
936mod tests {
937    use super::*;
938    use rand::{Rng, rng};
939
940    #[test]
941    fn test_from_filters() {
942        let filters = vec![
943            BooleanArray::from(vec![false, false, false, true, true, true, true]),
944            BooleanArray::from(vec![true, true, false, false, true, true, true]),
945            BooleanArray::from(vec![false, false, false, false]),
946            BooleanArray::from(Vec::<bool>::new()),
947        ];
948
949        let selection = RowSelection::from_filters(&filters[..1]);
950        assert!(selection.selects_any());
951        assert_eq!(
952            selection.selectors,
953            vec![RowSelector::skip(3), RowSelector::select(4)]
954        );
955
956        let selection = RowSelection::from_filters(&filters[..2]);
957        assert!(selection.selects_any());
958        assert_eq!(
959            selection.selectors,
960            vec![
961                RowSelector::skip(3),
962                RowSelector::select(6),
963                RowSelector::skip(2),
964                RowSelector::select(3)
965            ]
966        );
967
968        let selection = RowSelection::from_filters(&filters);
969        assert!(selection.selects_any());
970        assert_eq!(
971            selection.selectors,
972            vec![
973                RowSelector::skip(3),
974                RowSelector::select(6),
975                RowSelector::skip(2),
976                RowSelector::select(3),
977                RowSelector::skip(4)
978            ]
979        );
980
981        let selection = RowSelection::from_filters(&filters[2..3]);
982        assert!(!selection.selects_any());
983        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
984    }
985
986    #[test]
987    fn test_split_off() {
988        let mut selection = RowSelection::from(vec![
989            RowSelector::skip(34),
990            RowSelector::select(12),
991            RowSelector::skip(3),
992            RowSelector::select(35),
993        ]);
994
995        let split = selection.split_off(34);
996        assert_eq!(split.selectors, vec![RowSelector::skip(34)]);
997        assert_eq!(
998            selection.selectors,
999            vec![
1000                RowSelector::select(12),
1001                RowSelector::skip(3),
1002                RowSelector::select(35)
1003            ]
1004        );
1005
1006        let split = selection.split_off(5);
1007        assert_eq!(split.selectors, vec![RowSelector::select(5)]);
1008        assert_eq!(
1009            selection.selectors,
1010            vec![
1011                RowSelector::select(7),
1012                RowSelector::skip(3),
1013                RowSelector::select(35)
1014            ]
1015        );
1016
1017        let split = selection.split_off(8);
1018        assert_eq!(
1019            split.selectors,
1020            vec![RowSelector::select(7), RowSelector::skip(1)]
1021        );
1022        assert_eq!(
1023            selection.selectors,
1024            vec![RowSelector::skip(2), RowSelector::select(35)]
1025        );
1026
1027        let split = selection.split_off(200);
1028        assert_eq!(
1029            split.selectors,
1030            vec![RowSelector::skip(2), RowSelector::select(35)]
1031        );
1032        assert!(selection.selectors.is_empty());
1033    }
1034
1035    #[test]
1036    fn test_offset() {
1037        let selection = RowSelection::from(vec![
1038            RowSelector::select(5),
1039            RowSelector::skip(23),
1040            RowSelector::select(7),
1041            RowSelector::skip(33),
1042            RowSelector::select(6),
1043        ]);
1044
1045        let selection = selection.offset(2);
1046        assert_eq!(
1047            selection.selectors,
1048            vec![
1049                RowSelector::skip(2),
1050                RowSelector::select(3),
1051                RowSelector::skip(23),
1052                RowSelector::select(7),
1053                RowSelector::skip(33),
1054                RowSelector::select(6),
1055            ]
1056        );
1057
1058        let selection = selection.offset(5);
1059        assert_eq!(
1060            selection.selectors,
1061            vec![
1062                RowSelector::skip(30),
1063                RowSelector::select(5),
1064                RowSelector::skip(33),
1065                RowSelector::select(6),
1066            ]
1067        );
1068
1069        let selection = selection.offset(3);
1070        assert_eq!(
1071            selection.selectors,
1072            vec![
1073                RowSelector::skip(33),
1074                RowSelector::select(2),
1075                RowSelector::skip(33),
1076                RowSelector::select(6),
1077            ]
1078        );
1079
1080        let selection = selection.offset(2);
1081        assert_eq!(
1082            selection.selectors,
1083            vec![RowSelector::skip(68), RowSelector::select(6),]
1084        );
1085
1086        let selection = selection.offset(3);
1087        assert_eq!(
1088            selection.selectors,
1089            vec![RowSelector::skip(71), RowSelector::select(3),]
1090        );
1091    }
1092
1093    #[test]
1094    fn test_and() {
1095        let mut a = RowSelection::from(vec![
1096            RowSelector::skip(12),
1097            RowSelector::select(23),
1098            RowSelector::skip(3),
1099            RowSelector::select(5),
1100        ]);
1101
1102        let b = RowSelection::from(vec![
1103            RowSelector::select(5),
1104            RowSelector::skip(4),
1105            RowSelector::select(15),
1106            RowSelector::skip(4),
1107        ]);
1108
1109        let mut expected = RowSelection::from(vec![
1110            RowSelector::skip(12),
1111            RowSelector::select(5),
1112            RowSelector::skip(4),
1113            RowSelector::select(14),
1114            RowSelector::skip(3),
1115            RowSelector::select(1),
1116            RowSelector::skip(4),
1117        ]);
1118
1119        assert_eq!(a.and_then(&b), expected);
1120
1121        a.split_off(7);
1122        expected.split_off(7);
1123        assert_eq!(a.and_then(&b), expected);
1124
1125        let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]);
1126
1127        let b = RowSelection::from(vec![
1128            RowSelector::select(2),
1129            RowSelector::skip(1),
1130            RowSelector::select(1),
1131            RowSelector::skip(1),
1132        ]);
1133
1134        assert_eq!(
1135            a.and_then(&b).selectors,
1136            vec![
1137                RowSelector::select(2),
1138                RowSelector::skip(1),
1139                RowSelector::select(1),
1140                RowSelector::skip(4)
1141            ]
1142        );
1143    }
1144
1145    #[test]
1146    fn test_combine() {
1147        let a = vec![
1148            RowSelector::skip(3),
1149            RowSelector::skip(3),
1150            RowSelector::select(10),
1151            RowSelector::skip(4),
1152        ];
1153
1154        let b = vec![
1155            RowSelector::skip(3),
1156            RowSelector::skip(3),
1157            RowSelector::select(10),
1158            RowSelector::skip(4),
1159            RowSelector::skip(0),
1160        ];
1161
1162        let c = vec![
1163            RowSelector::skip(2),
1164            RowSelector::skip(4),
1165            RowSelector::select(3),
1166            RowSelector::select(3),
1167            RowSelector::select(4),
1168            RowSelector::skip(3),
1169            RowSelector::skip(1),
1170            RowSelector::skip(0),
1171        ];
1172
1173        let expected = RowSelection::from(vec![
1174            RowSelector::skip(6),
1175            RowSelector::select(10),
1176            RowSelector::skip(4),
1177        ]);
1178
1179        assert_eq!(RowSelection::from_iter(a), expected);
1180        assert_eq!(RowSelection::from_iter(b), expected);
1181        assert_eq!(RowSelection::from_iter(c), expected);
1182    }
1183
1184    #[test]
1185    fn test_combine_2elements() {
1186        let a = vec![RowSelector::select(10), RowSelector::select(5)];
1187        let a_expect = vec![RowSelector::select(15)];
1188        assert_eq!(RowSelection::from_iter(a).selectors, a_expect);
1189
1190        let b = vec![RowSelector::select(10), RowSelector::skip(5)];
1191        let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)];
1192        assert_eq!(RowSelection::from_iter(b).selectors, b_expect);
1193
1194        let c = vec![RowSelector::skip(10), RowSelector::select(5)];
1195        let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)];
1196        assert_eq!(RowSelection::from_iter(c).selectors, c_expect);
1197
1198        let d = vec![RowSelector::skip(10), RowSelector::skip(5)];
1199        let d_expect = vec![RowSelector::skip(15)];
1200        assert_eq!(RowSelection::from_iter(d).selectors, d_expect);
1201    }
1202
1203    #[test]
1204    fn test_from_one_and_empty() {
1205        let a = vec![RowSelector::select(10)];
1206        let selection1 = RowSelection::from(a.clone());
1207        assert_eq!(selection1.selectors, a);
1208
1209        let b = vec![];
1210        let selection1 = RowSelection::from(b.clone());
1211        assert_eq!(selection1.selectors, b)
1212    }
1213
1214    #[test]
1215    #[should_panic(expected = "selection exceeds the number of selected rows")]
1216    fn test_and_longer() {
1217        let a = RowSelection::from(vec![
1218            RowSelector::select(3),
1219            RowSelector::skip(33),
1220            RowSelector::select(3),
1221            RowSelector::skip(33),
1222        ]);
1223        let b = RowSelection::from(vec![RowSelector::select(36)]);
1224        a.and_then(&b);
1225    }
1226
1227    #[test]
1228    #[should_panic(expected = "selection contains less than the number of selected rows")]
1229    fn test_and_shorter() {
1230        let a = RowSelection::from(vec![
1231            RowSelector::select(3),
1232            RowSelector::skip(33),
1233            RowSelector::select(3),
1234            RowSelector::skip(33),
1235        ]);
1236        let b = RowSelection::from(vec![RowSelector::select(3)]);
1237        a.and_then(&b);
1238    }
1239
1240    #[test]
1241    fn test_intersect_row_selection_and_combine() {
1242        // a size equal b size
1243        let a = vec![
1244            RowSelector::select(5),
1245            RowSelector::skip(4),
1246            RowSelector::select(1),
1247        ];
1248        let b = vec![
1249            RowSelector::select(8),
1250            RowSelector::skip(1),
1251            RowSelector::select(1),
1252        ];
1253
1254        let res = intersect_row_selections(&a, &b);
1255        assert_eq!(
1256            res.selectors,
1257            vec![
1258                RowSelector::select(5),
1259                RowSelector::skip(4),
1260                RowSelector::select(1),
1261            ],
1262        );
1263
1264        // a size larger than b size
1265        let a = vec![
1266            RowSelector::select(3),
1267            RowSelector::skip(33),
1268            RowSelector::select(3),
1269            RowSelector::skip(33),
1270        ];
1271        let b = vec![RowSelector::select(36), RowSelector::skip(36)];
1272        let res = intersect_row_selections(&a, &b);
1273        assert_eq!(
1274            res.selectors,
1275            vec![RowSelector::select(3), RowSelector::skip(69)]
1276        );
1277
1278        // a size less than b size
1279        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
1280        let b = vec![
1281            RowSelector::select(2),
1282            RowSelector::skip(2),
1283            RowSelector::select(2),
1284            RowSelector::skip(2),
1285            RowSelector::select(2),
1286        ];
1287        let res = intersect_row_selections(&a, &b);
1288        assert_eq!(
1289            res.selectors,
1290            vec![RowSelector::select(2), RowSelector::skip(8)]
1291        );
1292
1293        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
1294        let b = vec![
1295            RowSelector::select(2),
1296            RowSelector::skip(2),
1297            RowSelector::select(2),
1298            RowSelector::skip(2),
1299            RowSelector::select(2),
1300        ];
1301        let res = intersect_row_selections(&a, &b);
1302        assert_eq!(
1303            res.selectors,
1304            vec![RowSelector::select(2), RowSelector::skip(8)]
1305        );
1306    }
1307
1308    #[test]
1309    fn test_and_fuzz() {
1310        let mut rand = rng();
1311        for _ in 0..100 {
1312            let a_len = rand.random_range(10..100);
1313            let a_bools: Vec<_> = (0..a_len).map(|_| rand.random_bool(0.2)).collect();
1314            let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]);
1315
1316            let b_len: usize = a_bools.iter().map(|x| *x as usize).sum();
1317            let b_bools: Vec<_> = (0..b_len).map(|_| rand.random_bool(0.8)).collect();
1318            let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]);
1319
1320            let mut expected_bools = vec![false; a_len];
1321
1322            let mut iter_b = b_bools.iter();
1323            for (idx, b) in a_bools.iter().enumerate() {
1324                if *b && *iter_b.next().unwrap() {
1325                    expected_bools[idx] = true;
1326                }
1327            }
1328
1329            let expected = RowSelection::from_filters(&[BooleanArray::from(expected_bools)]);
1330
1331            let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum();
1332            assert_eq!(a_len, total_rows);
1333
1334            assert_eq!(a.and_then(&b), expected);
1335        }
1336    }
1337
1338    #[test]
1339    fn test_iter() {
1340        // use the iter() API to show it does what is expected and
1341        // avoid accidental deletion
1342        let selectors = vec![
1343            RowSelector::select(3),
1344            RowSelector::skip(33),
1345            RowSelector::select(4),
1346        ];
1347
1348        let round_tripped = RowSelection::from(selectors.clone())
1349            .iter()
1350            .cloned()
1351            .collect::<Vec<_>>();
1352        assert_eq!(selectors, round_tripped);
1353    }
1354
1355    #[test]
1356    fn test_limit() {
1357        // Limit to existing limit should no-op
1358        let selection = RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(90)]);
1359        let limited = selection.limit(10);
1360        assert_eq!(RowSelection::from(vec![RowSelector::select(10)]), limited);
1361
1362        let selection = RowSelection::from(vec![
1363            RowSelector::select(10),
1364            RowSelector::skip(10),
1365            RowSelector::select(10),
1366            RowSelector::skip(10),
1367            RowSelector::select(10),
1368        ]);
1369
1370        let limited = selection.clone().limit(5);
1371        let expected = vec![RowSelector::select(5)];
1372        assert_eq!(limited.selectors, expected);
1373
1374        let limited = selection.clone().limit(15);
1375        let expected = vec![
1376            RowSelector::select(10),
1377            RowSelector::skip(10),
1378            RowSelector::select(5),
1379        ];
1380        assert_eq!(limited.selectors, expected);
1381
1382        let limited = selection.clone().limit(0);
1383        let expected = vec![];
1384        assert_eq!(limited.selectors, expected);
1385
1386        let limited = selection.clone().limit(30);
1387        let expected = vec![
1388            RowSelector::select(10),
1389            RowSelector::skip(10),
1390            RowSelector::select(10),
1391            RowSelector::skip(10),
1392            RowSelector::select(10),
1393        ];
1394        assert_eq!(limited.selectors, expected);
1395
1396        let limited = selection.limit(100);
1397        let expected = vec![
1398            RowSelector::select(10),
1399            RowSelector::skip(10),
1400            RowSelector::select(10),
1401            RowSelector::skip(10),
1402            RowSelector::select(10),
1403        ];
1404        assert_eq!(limited.selectors, expected);
1405    }
1406
1407    #[test]
1408    fn test_scan_ranges() {
1409        let index = vec![
1410            PageLocation {
1411                offset: 0,
1412                compressed_page_size: 10,
1413                first_row_index: 0,
1414            },
1415            PageLocation {
1416                offset: 10,
1417                compressed_page_size: 10,
1418                first_row_index: 10,
1419            },
1420            PageLocation {
1421                offset: 20,
1422                compressed_page_size: 10,
1423                first_row_index: 20,
1424            },
1425            PageLocation {
1426                offset: 30,
1427                compressed_page_size: 10,
1428                first_row_index: 30,
1429            },
1430            PageLocation {
1431                offset: 40,
1432                compressed_page_size: 10,
1433                first_row_index: 40,
1434            },
1435            PageLocation {
1436                offset: 50,
1437                compressed_page_size: 10,
1438                first_row_index: 50,
1439            },
1440            PageLocation {
1441                offset: 60,
1442                compressed_page_size: 10,
1443                first_row_index: 60,
1444            },
1445        ];
1446
1447        let selection = RowSelection::from(vec![
1448            // Skip first page
1449            RowSelector::skip(10),
1450            // Multiple selects in same page
1451            RowSelector::select(3),
1452            RowSelector::skip(3),
1453            RowSelector::select(4),
1454            // Select to page boundary
1455            RowSelector::skip(5),
1456            RowSelector::select(5),
1457            // Skip full page past page boundary
1458            RowSelector::skip(12),
1459            // Select across page boundaries
1460            RowSelector::select(12),
1461            // Skip final page
1462            RowSelector::skip(12),
1463        ]);
1464
1465        let ranges = selection.scan_ranges(&index);
1466
1467        // assert_eq!(mask, vec![false, true, true, false, true, true, false]);
1468        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);
1469
1470        let selection = RowSelection::from(vec![
1471            // Skip first page
1472            RowSelector::skip(10),
1473            // Multiple selects in same page
1474            RowSelector::select(3),
1475            RowSelector::skip(3),
1476            RowSelector::select(4),
1477            // Select to page boundary
1478            RowSelector::skip(5),
1479            RowSelector::select(5),
1480            // Skip full page past page boundary
1481            RowSelector::skip(12),
1482            // Select across page boundaries
1483            RowSelector::select(12),
1484            RowSelector::skip(1),
1485            // Select across page boundaries including final page
1486            RowSelector::select(8),
1487        ]);
1488
1489        let ranges = selection.scan_ranges(&index);
1490
1491        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1492        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1493
1494        let selection = RowSelection::from(vec![
1495            // Skip first page
1496            RowSelector::skip(10),
1497            // Multiple selects in same page
1498            RowSelector::select(3),
1499            RowSelector::skip(3),
1500            RowSelector::select(4),
1501            // Select to page boundary
1502            RowSelector::skip(5),
1503            RowSelector::select(5),
1504            // Skip full page past page boundary
1505            RowSelector::skip(12),
1506            // Select to final page boundary
1507            RowSelector::select(12),
1508            RowSelector::skip(1),
1509            // Skip across final page boundary
1510            RowSelector::skip(8),
1511            // Select from final page
1512            RowSelector::select(4),
1513        ]);
1514
1515        let ranges = selection.scan_ranges(&index);
1516
1517        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1518        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
1519
1520        let selection = RowSelection::from(vec![
1521            // Skip first page
1522            RowSelector::skip(10),
1523            // Multiple selects in same page
1524            RowSelector::select(3),
1525            RowSelector::skip(3),
1526            RowSelector::select(4),
1527            // Select to remaining in page and first row of next page
1528            RowSelector::skip(5),
1529            RowSelector::select(6),
1530            // Skip remaining
1531            RowSelector::skip(50),
1532        ]);
1533
1534        let ranges = selection.scan_ranges(&index);
1535
1536        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
1537        assert_eq!(ranges, vec![10..20, 20..30, 30..40]);
1538    }
1539
1540    #[test]
1541    fn test_from_ranges() {
1542        let ranges = [1..3, 4..6, 6..6, 8..8, 9..10];
1543        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10);
1544        assert_eq!(
1545            selection.selectors,
1546            vec![
1547                RowSelector::skip(1),
1548                RowSelector::select(2),
1549                RowSelector::skip(1),
1550                RowSelector::select(2),
1551                RowSelector::skip(3),
1552                RowSelector::select(1)
1553            ]
1554        );
1555
1556        let out_of_order_ranges = [1..3, 8..10, 4..7];
1557        let result = std::panic::catch_unwind(|| {
1558            RowSelection::from_consecutive_ranges(out_of_order_ranges.into_iter(), 10)
1559        });
1560        assert!(result.is_err());
1561    }
1562
1563    #[test]
1564    fn test_empty_selector() {
1565        let selection = RowSelection::from(vec![
1566            RowSelector::skip(0),
1567            RowSelector::select(2),
1568            RowSelector::skip(0),
1569            RowSelector::select(2),
1570        ]);
1571        assert_eq!(selection.selectors, vec![RowSelector::select(4)]);
1572
1573        let selection = RowSelection::from(vec![
1574            RowSelector::select(0),
1575            RowSelector::skip(2),
1576            RowSelector::select(0),
1577            RowSelector::skip(2),
1578        ]);
1579        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
1580    }
1581
1582    #[test]
1583    fn test_intersection() {
1584        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1585        let result = selection.intersection(&selection);
1586        assert_eq!(result, selection);
1587
1588        let a = RowSelection::from(vec![
1589            RowSelector::skip(10),
1590            RowSelector::select(10),
1591            RowSelector::skip(10),
1592            RowSelector::select(20),
1593        ]);
1594
1595        let b = RowSelection::from(vec![
1596            RowSelector::skip(20),
1597            RowSelector::select(20),
1598            RowSelector::skip(10),
1599        ]);
1600
1601        let result = a.intersection(&b);
1602        assert_eq!(
1603            result.selectors,
1604            vec![
1605                RowSelector::skip(30),
1606                RowSelector::select(10),
1607                RowSelector::skip(10)
1608            ]
1609        );
1610    }
1611
1612    #[test]
1613    fn test_union() {
1614        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
1615        let result = selection.union(&selection);
1616        assert_eq!(result, selection);
1617
1618        // NYNYY
1619        let a = RowSelection::from(vec![
1620            RowSelector::skip(10),
1621            RowSelector::select(10),
1622            RowSelector::skip(10),
1623            RowSelector::select(20),
1624        ]);
1625
1626        // NNYYNYN
1627        let b = RowSelection::from(vec![
1628            RowSelector::skip(20),
1629            RowSelector::select(20),
1630            RowSelector::skip(10),
1631            RowSelector::select(10),
1632            RowSelector::skip(10),
1633        ]);
1634
1635        let result = a.union(&b);
1636
1637        // NYYYYYN
1638        assert_eq!(
1639            result.iter().collect::<Vec<_>>(),
1640            vec![
1641                &RowSelector::skip(10),
1642                &RowSelector::select(50),
1643                &RowSelector::skip(10),
1644            ]
1645        );
1646    }
1647
1648    #[test]
1649    fn test_row_count() {
1650        let selection = RowSelection::from(vec![
1651            RowSelector::skip(34),
1652            RowSelector::select(12),
1653            RowSelector::skip(3),
1654            RowSelector::select(35),
1655        ]);
1656
1657        assert_eq!(selection.row_count(), 12 + 35);
1658        assert_eq!(selection.skipped_row_count(), 34 + 3);
1659
1660        let selection = RowSelection::from(vec![RowSelector::select(12), RowSelector::select(35)]);
1661
1662        assert_eq!(selection.row_count(), 12 + 35);
1663        assert_eq!(selection.skipped_row_count(), 0);
1664
1665        let selection = RowSelection::from(vec![RowSelector::skip(34), RowSelector::skip(3)]);
1666
1667        assert_eq!(selection.row_count(), 0);
1668        assert_eq!(selection.skipped_row_count(), 34 + 3);
1669
1670        let selection = RowSelection::from(vec![]);
1671
1672        assert_eq!(selection.row_count(), 0);
1673        assert_eq!(selection.skipped_row_count(), 0);
1674    }
1675
1676    #[test]
1677    fn test_trim() {
1678        let selection = RowSelection::from(vec![
1679            RowSelector::skip(34),
1680            RowSelector::select(12),
1681            RowSelector::skip(3),
1682            RowSelector::select(35),
1683        ]);
1684
1685        let expected = vec![
1686            RowSelector::skip(34),
1687            RowSelector::select(12),
1688            RowSelector::skip(3),
1689            RowSelector::select(35),
1690        ];
1691
1692        assert_eq!(selection.trim().selectors, expected);
1693
1694        let selection = RowSelection::from(vec![
1695            RowSelector::skip(34),
1696            RowSelector::select(12),
1697            RowSelector::skip(3),
1698        ]);
1699
1700        let expected = vec![RowSelector::skip(34), RowSelector::select(12)];
1701
1702        assert_eq!(selection.trim().selectors, expected);
1703    }
1704}