Skip to main content

parquet/arrow/arrow_reader/
read_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
19//! from a Parquet file
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
23use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
24use crate::arrow::arrow_reader::{
25    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
26};
27use crate::errors::{ParquetError, Result};
28use arrow_array::{Array, BooleanArray};
29use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
30use arrow_select::filter::prep_null_mask_filter;
31use std::collections::VecDeque;
32
33/// Options for [`ReadPlanBuilder::with_predicate_options`].
34pub struct PredicateOptions<'a> {
35    array_reader: Box<dyn ArrayReader>,
36    predicate: &'a mut dyn ArrowPredicate,
37    limit: Option<usize>,
38    total_rows: usize,
39}
40
41impl<'a> PredicateOptions<'a> {
42    /// Create options for evaluating `predicate` against rows produced by
43    /// `array_reader`.
44    ///
45    /// By default there is no match-count limit; the predicate is evaluated
46    /// over every row the reader yields. Use [`Self::with_limit`] to enable
47    /// early termination.
48    pub fn new(array_reader: Box<dyn ArrayReader>, predicate: &'a mut dyn ArrowPredicate) -> Self {
49        Self {
50            array_reader,
51            predicate,
52            limit: None,
53            total_rows: 0,
54        }
55    }
56
57    /// Stop scanning `array_reader` once `limit` matches have accumulated.
58    ///
59    /// Performance optimization for `LIMIT` / TopK: when the cumulative
60    /// `true_count` reaches `limit`, the current filter batch is truncated
61    /// at the `limit`-th match and remaining batches are never decoded.
62    ///
63    /// `limit` counts predicate matches, not output rows — callers applying
64    /// an offset must pass `offset + limit`.
65    ///
66    /// `total_rows` is the row count `array_reader` would yield if iterated
67    /// to completion. It is used to pad un-evaluated trailing rows as "not
68    /// selected" so the returned [`RowSelection`] covers the full row group.
69    ///
70    /// Only valid for the *last* predicate in a filter chain: intermediate
71    /// predicates' match counts do not map 1:1 to output rows.
72    pub fn with_limit(mut self, limit: usize, total_rows: usize) -> Self {
73        self.limit = Some(limit);
74        self.total_rows = total_rows;
75        self
76    }
77}
78
79/// A builder for [`ReadPlan`]
80#[derive(Clone, Debug)]
81pub struct ReadPlanBuilder {
82    batch_size: usize,
83    /// Which rows to select. Includes the result of all filters applied so far
84    selection: Option<RowSelection>,
85    /// Policy to use when materializing the row selection
86    row_selection_policy: RowSelectionPolicy,
87}
88
89impl ReadPlanBuilder {
90    /// Create a `ReadPlanBuilder` with the given batch size
91    pub fn new(batch_size: usize) -> Self {
92        Self {
93            batch_size,
94            selection: None,
95            row_selection_policy: RowSelectionPolicy::default(),
96        }
97    }
98
99    /// Set the current selection to the given value
100    pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
101        self.selection = selection;
102        self
103    }
104
105    /// Configure the policy to use when materialising the [`RowSelection`]
106    ///
107    /// Defaults to [`RowSelectionPolicy::Auto`]
108    pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
109        self.row_selection_policy = policy;
110        self
111    }
112
113    /// Returns the current row selection policy
114    pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
115        &self.row_selection_policy
116    }
117
118    /// Returns the current selection, if any
119    pub fn selection(&self) -> Option<&RowSelection> {
120        self.selection.as_ref()
121    }
122
123    /// Specifies the number of rows in the row group, before filtering is applied.
124    ///
125    /// Returns a [`LimitedReadPlanBuilder`] that can apply
126    /// offset and limit.
127    ///
128    /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this
129    /// selection.
130    pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
131        LimitedReadPlanBuilder::new(self, row_count)
132    }
133
134    /// Returns true if the current plan selects any rows
135    pub fn selects_any(&self) -> bool {
136        self.selection
137            .as_ref()
138            .map(|s| s.selects_any())
139            .unwrap_or(true)
140    }
141
142    /// Returns the number of rows selected, or `None` if all rows are selected.
143    pub fn num_rows_selected(&self) -> Option<usize> {
144        self.selection.as_ref().map(|s| s.row_count())
145    }
146
147    /// Returns the [`RowSelectionStrategy`] for this plan.
148    ///
149    /// Guarantees to return either `Selectors` or `Mask`, never `Auto`.
150    pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
151        match self.row_selection_policy {
152            RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
153            RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
154            RowSelectionPolicy::Auto { threshold, .. } => {
155                let selection = match self.selection.as_ref() {
156                    Some(selection) => selection,
157                    None => return RowSelectionStrategy::Selectors,
158                };
159
160                // total_rows: total number of rows selected / skipped
161                // effective_count: number of non-empty selectors
162                let (total_rows, effective_count) =
163                    selection.iter().fold((0usize, 0usize), |(rows, count), s| {
164                        if s.row_count > 0 {
165                            (rows + s.row_count, count + 1)
166                        } else {
167                            (rows, count)
168                        }
169                    });
170
171                if effective_count == 0 {
172                    return RowSelectionStrategy::Mask;
173                }
174
175                if total_rows < effective_count.saturating_mul(threshold) {
176                    RowSelectionStrategy::Mask
177                } else {
178                    RowSelectionStrategy::Selectors
179                }
180            }
181        }
182    }
183
184    /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
185    ///
186    /// If the current `selection` is `Some`, the resulting [`RowSelection`]
187    /// will be the conjunction of the existing selection and the rows selected
188    /// by `predicate`.
189    ///
190    /// Note: pre-existing selections may come from evaluating a previous predicate
191    /// or if the [`ParquetRecordBatchReader`] specified an explicit
192    /// [`RowSelection`] in addition to one or more predicates.
193    pub fn with_predicate(
194        self,
195        array_reader: Box<dyn ArrayReader>,
196        predicate: &mut dyn ArrowPredicate,
197    ) -> Result<Self> {
198        self.with_predicate_options(PredicateOptions::new(array_reader, predicate))
199    }
200
201    /// Evaluates an [`ArrowPredicate`] with the given [`PredicateOptions`],
202    /// updating this plan's `selection`.
203    ///
204    /// Like [`Self::with_predicate`], but allows additional options such as a
205    /// match-count limit for early termination (see
206    /// [`PredicateOptions::with_limit`]).
207    pub fn with_predicate_options(mut self, options: PredicateOptions<'_>) -> Result<Self> {
208        let PredicateOptions {
209            array_reader,
210            predicate,
211            limit,
212            total_rows,
213        } = options;
214
215        // Target length for the concatenated filter output:
216        // - Prior selection ⇒ the reader yields that many rows; `and_then`
217        //   below requires the filter output to match.
218        // - No prior selection ⇒ the reader yields `total_rows`. We only
219        //   need to pad when `limit` may short-circuit the loop; otherwise
220        //   iteration naturally exhausts.
221        let expected_rows = match self.selection.as_ref() {
222            Some(s) => Some(s.row_count()),
223            None => limit.map(|_| total_rows),
224        };
225
226        let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
227        let mut filters = vec![];
228        let mut processed_rows: usize = 0;
229        let mut matched_rows: usize = 0;
230        for maybe_batch in reader {
231            let maybe_batch = maybe_batch?;
232            let input_rows = maybe_batch.num_rows();
233            let filter = predicate.evaluate(maybe_batch)?;
234            // Since user supplied predicate, check error here to catch bugs quickly
235            if filter.len() != input_rows {
236                return Err(arrow_err!(
237                    "ArrowPredicate predicate returned {} rows, expected {input_rows}",
238                    filter.len()
239                ));
240            }
241            let filter = match filter.null_count() {
242                0 => filter,
243                _ => prep_null_mask_filter(&filter),
244            };
245
246            processed_rows += input_rows;
247
248            match limit {
249                Some(limit) if matched_rows + filter.true_count() >= limit => {
250                    let needed = limit - matched_rows;
251                    let truncated = truncate_filter_after_n_trues(filter, needed);
252                    filters.push(truncated);
253                    break;
254                }
255                _ => {
256                    matched_rows += filter.true_count();
257                    filters.push(filter);
258                }
259            }
260        }
261
262        // Pad the tail so the filters cover `expected_rows` total. This keeps
263        // the invariant that the resulting `RowSelection` spans every row the
264        // reader would have produced — rows past the early break are marked
265        // "not selected". When no limit is set the loop always exhausts and
266        // no padding is needed.
267        if let Some(expected) = expected_rows {
268            if processed_rows < expected {
269                let pad_len = expected - processed_rows;
270                filters.push(BooleanArray::new(BooleanBuffer::new_unset(pad_len), None));
271            }
272        }
273
274        // If the predicate selected all rows and there is no prior selection,
275        // skip creating a RowSelection entirely — this avoids the allocation
276        // and keeps selection as None which enables coalesced page fetches.
277        let all_selected = filters.iter().all(|f| f.true_count() == f.len());
278        if all_selected && self.selection.is_none() {
279            return Ok(self);
280        }
281        let raw = RowSelection::from_filters(&filters);
282        self.selection = match self.selection.take() {
283            Some(selection) => Some(selection.and_then(&raw)),
284            None => Some(raw),
285        };
286        Ok(self)
287    }
288
289    /// Create a final `ReadPlan` the read plan for the scan
290    pub fn build(mut self) -> ReadPlan {
291        // If selection is empty, truncate
292        if !self.selects_any() {
293            self.selection = Some(RowSelection::from(vec![]));
294        }
295
296        // Preferred strategy must not be Auto
297        let selection_strategy = self.resolve_selection_strategy();
298
299        let Self {
300            batch_size,
301            selection,
302            row_selection_policy: _,
303        } = self;
304
305        let selection = selection.map(|s| s.trim());
306
307        let row_selection_cursor = selection
308            .map(|s| {
309                let trimmed = s.trim();
310                let selectors: Vec<RowSelector> = trimmed.into();
311                match selection_strategy {
312                    RowSelectionStrategy::Mask => {
313                        RowSelectionCursor::new_mask_from_selectors(selectors)
314                    }
315                    RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
316                }
317            })
318            .unwrap_or(RowSelectionCursor::new_all());
319
320        ReadPlan {
321            batch_size,
322            row_selection_cursor,
323        }
324    }
325}
326
327/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
328///
329/// See [`ReadPlanBuilder::limited`] to create this builder.
330pub(crate) struct LimitedReadPlanBuilder {
331    /// The underlying builder
332    inner: ReadPlanBuilder,
333    /// Total number of rows in the row group before the selection, limit or
334    /// offset are applied
335    row_count: usize,
336    /// The offset to apply, if any
337    offset: Option<usize>,
338    /// The limit to apply, if any
339    limit: Option<usize>,
340}
341
342impl LimitedReadPlanBuilder {
343    /// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows
344    fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
345        Self {
346            inner,
347            row_count,
348            offset: None,
349            limit: None,
350        }
351    }
352
353    /// Set the offset to apply to the read plan
354    pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
355        self.offset = offset;
356        self
357    }
358
359    /// Set the limit to apply to the read plan
360    pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
361        self.limit = limit;
362        self
363    }
364
365    /// Apply offset and limit, updating the selection on the underlying builder
366    /// and returning it.
367    pub(crate) fn build_limited(self) -> ReadPlanBuilder {
368        let Self {
369            mut inner,
370            row_count,
371            offset,
372            limit,
373        } = self;
374
375        // If the selection is empty, truncate
376        if !inner.selects_any() {
377            inner.selection = Some(RowSelection::from(vec![]));
378        }
379
380        // If an offset is defined, apply it to the `selection`
381        if let Some(offset) = offset {
382            inner.selection = Some(match row_count.checked_sub(offset) {
383                None => RowSelection::from(vec![]),
384                Some(remaining) => inner
385                    .selection
386                    .map(|selection| selection.offset(offset))
387                    .unwrap_or_else(|| {
388                        RowSelection::from(vec![
389                            RowSelector::skip(offset),
390                            RowSelector::select(remaining),
391                        ])
392                    }),
393            });
394        }
395
396        // If a limit is defined, apply it to the final `selection`
397        if let Some(limit) = limit {
398            inner.selection = Some(
399                inner
400                    .selection
401                    .map(|selection| selection.limit(limit))
402                    .unwrap_or_else(|| {
403                        RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
404                    }),
405            );
406        }
407
408        inner
409    }
410}
411
412/// Produce a new `BooleanArray` of the same length as `filter` in which only
413/// the first `n` `true` positions from `filter` remain `true`; any `true`
414/// positions beyond the first `n` are replaced with `false`.
415///
416/// `filter` must not contain nulls (callers apply [`prep_null_mask_filter`]
417/// first). If `filter` has at most `n` `true` values, a clone is returned.
418fn truncate_filter_after_n_trues(filter: BooleanArray, n: usize) -> BooleanArray {
419    if filter.true_count() <= n {
420        return filter;
421    }
422    let len = filter.len();
423    if n == 0 {
424        return BooleanArray::new(BooleanBuffer::new_unset(len), None);
425    }
426    // `set_indices` scans 64 bits at a time via `trailing_zeros`, so locating
427    // the `n`-th set bit is cheaper than visiting every bit. Everything up to
428    // and including that position is copied verbatim; the rest is zeroed.
429    let values = filter.values();
430    let last_kept = values
431        .set_indices()
432        .nth(n - 1)
433        .expect("n - 1 < true_count, checked above");
434
435    let mut builder = BooleanBufferBuilder::new(len);
436    builder.append_buffer(&values.slice(0, last_kept + 1));
437    builder.append_n(len - last_kept - 1, false);
438    BooleanArray::new(builder.finish(), None)
439}
440
441/// A plan reading specific rows from a Parquet Row Group.
442///
443/// See [`ReadPlanBuilder`] to create `ReadPlan`s
444#[derive(Debug)]
445pub struct ReadPlan {
446    /// The number of rows to read in each batch
447    batch_size: usize,
448    /// Row ranges to be selected from the data source
449    row_selection_cursor: RowSelectionCursor,
450}
451
452impl ReadPlan {
453    /// Returns a mutable reference to the selection selectors, if any
454    #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
455    pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
456        if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
457            Some(selectors_cursor.selectors_mut())
458        } else {
459            None
460        }
461    }
462
463    /// Returns a mutable reference to the row selection cursor
464    pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
465        &mut self.row_selection_cursor
466    }
467
468    /// Return the number of rows to read in each output batch
469    #[inline(always)]
470    pub fn batch_size(&self) -> usize {
471        self.batch_size
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478
479    fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
480        ReadPlanBuilder::new(1024).with_selection(Some(selection))
481    }
482
483    #[test]
484    fn preferred_selection_strategy_prefers_mask_by_default() {
485        let selection = RowSelection::from(vec![RowSelector::select(8)]);
486        let builder = builder_with_selection(selection);
487        assert_eq!(
488            builder.resolve_selection_strategy(),
489            RowSelectionStrategy::Mask
490        );
491    }
492
493    #[test]
494    fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
495        let selection = RowSelection::from(vec![RowSelector::select(8)]);
496        let builder = builder_with_selection(selection)
497            .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
498        assert_eq!(
499            builder.resolve_selection_strategy(),
500            RowSelectionStrategy::Selectors
501        );
502    }
503
504    #[test]
505    fn truncate_filter_after_n_trues_keeps_first_n_matches() {
506        let f = BooleanArray::from(vec![true, false, true, true, false, true, true]);
507        // true positions: 0, 2, 3, 5, 6
508        let t = truncate_filter_after_n_trues(f.clone(), 3);
509        assert_eq!(t.len(), f.len());
510        assert_eq!(t.true_count(), 3);
511        let out: Vec<bool> = (0..t.len()).map(|i| t.value(i)).collect();
512        assert_eq!(
513            out,
514            vec![true, false, true, true, false, false, false],
515            "first three trues should survive, the rest become false"
516        );
517    }
518
519    #[test]
520    fn truncate_filter_after_n_trues_passes_through_when_already_small_enough() {
521        let f = BooleanArray::from(vec![true, false, true, false]);
522        let t = truncate_filter_after_n_trues(f.clone(), 5);
523        assert_eq!(t.len(), f.len());
524        assert_eq!(t.true_count(), 2);
525    }
526
527    #[test]
528    fn truncate_filter_after_n_trues_zero_returns_all_false() {
529        let f = BooleanArray::from(vec![true, true, true]);
530        let t = truncate_filter_after_n_trues(f, 0);
531        assert_eq!(t.len(), 3);
532        assert_eq!(t.true_count(), 0);
533    }
534
535    #[test]
536    fn with_predicate_options_limit_pads_tail_when_no_prior_selection() {
537        use crate::arrow::ProjectionMask;
538        use crate::arrow::array_reader::StructArrayReader;
539        use crate::arrow::array_reader::test_util::InMemoryArrayReader;
540        use crate::arrow::arrow_reader::ArrowPredicateFn;
541        use arrow_array::Int32Array;
542        use arrow_schema::{DataType as ArrowType, Field, Fields};
543        use std::sync::Arc;
544
545        // 100 rows, all match the predicate. Limit stops the loop after 10
546        // matches — but the resulting RowSelection must still describe the
547        // full 100-row row group (90 trailing rows as "not selected"), not
548        // only the 10 rows we happened to evaluate before breaking.
549        const TOTAL_ROWS: usize = 100;
550        const LIMIT: usize = 10;
551
552        let data: Vec<i32> = (0..TOTAL_ROWS as i32).collect();
553        let array = Arc::new(Int32Array::from(data));
554        let leaf = InMemoryArrayReader::new(ArrowType::Int32, array.clone(), None, None);
555        let struct_type = ArrowType::Struct(Fields::from(vec![Field::new(
556            "c0",
557            ArrowType::Int32,
558            false,
559        )]));
560        let struct_reader = StructArrayReader::new(struct_type, vec![Box::new(leaf)], 0, 0, false);
561
562        let mut predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
563            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
564        });
565
566        let builder = ReadPlanBuilder::new(16)
567            .with_predicate_options(
568                PredicateOptions::new(Box::new(struct_reader), &mut predicate)
569                    .with_limit(LIMIT, TOTAL_ROWS),
570            )
571            .unwrap();
572
573        let selection = builder
574            .selection()
575            .expect("limit-driven early break must produce a selection");
576
577        // `row_count` counts selected rows — must equal the limit.
578        assert_eq!(selection.row_count(), LIMIT);
579
580        // Total rows covered (selects + skips) must equal the full row group
581        // so downstream offset/limit math stays in absolute-row space.
582        let total: usize = selection.iter().map(|s| s.row_count).sum();
583        assert_eq!(
584            total, TOTAL_ROWS,
585            "selection must span the full row group, not only the prefix evaluated before the limit"
586        );
587    }
588}