Skip to main content

parquet/arrow/arrow_reader/
read_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
19//! from a Parquet file
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
23use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
24use crate::arrow::arrow_reader::{
25    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
26};
27use crate::errors::{ParquetError, Result};
28use arrow_array::Array;
29use arrow_select::filter::prep_null_mask_filter;
30use std::collections::VecDeque;
31
32/// A builder for [`ReadPlan`]
33#[derive(Clone, Debug)]
34pub struct ReadPlanBuilder {
35    batch_size: usize,
36    /// Which rows to select. Includes the result of all filters applied so far
37    selection: Option<RowSelection>,
38    /// Policy to use when materializing the row selection
39    row_selection_policy: RowSelectionPolicy,
40}
41
42impl ReadPlanBuilder {
43    /// Create a `ReadPlanBuilder` with the given batch size
44    pub fn new(batch_size: usize) -> Self {
45        Self {
46            batch_size,
47            selection: None,
48            row_selection_policy: RowSelectionPolicy::default(),
49        }
50    }
51
52    /// Set the current selection to the given value
53    pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
54        self.selection = selection;
55        self
56    }
57
58    /// Configure the policy to use when materialising the [`RowSelection`]
59    ///
60    /// Defaults to [`RowSelectionPolicy::Auto`]
61    pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
62        self.row_selection_policy = policy;
63        self
64    }
65
66    /// Returns the current row selection policy
67    pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
68        &self.row_selection_policy
69    }
70
71    /// Returns the current selection, if any
72    pub fn selection(&self) -> Option<&RowSelection> {
73        self.selection.as_ref()
74    }
75
76    /// Specifies the number of rows in the row group, before filtering is applied.
77    ///
78    /// Returns a [`LimitedReadPlanBuilder`] that can apply
79    /// offset and limit.
80    ///
81    /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this
82    /// selection.
83    pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
84        LimitedReadPlanBuilder::new(self, row_count)
85    }
86
87    /// Returns true if the current plan selects any rows
88    pub fn selects_any(&self) -> bool {
89        self.selection
90            .as_ref()
91            .map(|s| s.selects_any())
92            .unwrap_or(true)
93    }
94
95    /// Returns the number of rows selected, or `None` if all rows are selected.
96    pub fn num_rows_selected(&self) -> Option<usize> {
97        self.selection.as_ref().map(|s| s.row_count())
98    }
99
100    /// Returns the [`RowSelectionStrategy`] for this plan.
101    ///
102    /// Guarantees to return either `Selectors` or `Mask`, never `Auto`.
103    pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
104        match self.row_selection_policy {
105            RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
106            RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
107            RowSelectionPolicy::Auto { threshold, .. } => {
108                let selection = match self.selection.as_ref() {
109                    Some(selection) => selection,
110                    None => return RowSelectionStrategy::Selectors,
111                };
112
113                // total_rows: total number of rows selected / skipped
114                // effective_count: number of non-empty selectors
115                let (total_rows, effective_count) =
116                    selection.iter().fold((0usize, 0usize), |(rows, count), s| {
117                        if s.row_count > 0 {
118                            (rows + s.row_count, count + 1)
119                        } else {
120                            (rows, count)
121                        }
122                    });
123
124                if effective_count == 0 {
125                    return RowSelectionStrategy::Mask;
126                }
127
128                if total_rows < effective_count.saturating_mul(threshold) {
129                    RowSelectionStrategy::Mask
130                } else {
131                    RowSelectionStrategy::Selectors
132                }
133            }
134        }
135    }
136
137    /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
138    ///
139    /// If the current `selection` is `Some`, the resulting [`RowSelection`]
140    /// will be the conjunction of the existing selection and the rows selected
141    /// by `predicate`.
142    ///
143    /// Note: pre-existing selections may come from evaluating a previous predicate
144    /// or if the [`ParquetRecordBatchReader`] specified an explicit
145    /// [`RowSelection`] in addition to one or more predicates.
146    pub fn with_predicate(
147        mut self,
148        array_reader: Box<dyn ArrayReader>,
149        predicate: &mut dyn ArrowPredicate,
150    ) -> Result<Self> {
151        let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
152        let mut filters = vec![];
153        for maybe_batch in reader {
154            let maybe_batch = maybe_batch?;
155            let input_rows = maybe_batch.num_rows();
156            let filter = predicate.evaluate(maybe_batch)?;
157            // Since user supplied predicate, check error here to catch bugs quickly
158            if filter.len() != input_rows {
159                return Err(arrow_err!(
160                    "ArrowPredicate predicate returned {} rows, expected {input_rows}",
161                    filter.len()
162                ));
163            }
164            match filter.null_count() {
165                0 => filters.push(filter),
166                _ => filters.push(prep_null_mask_filter(&filter)),
167            };
168        }
169
170        // If the predicate selected all rows and there is no prior selection,
171        // skip creating a RowSelection entirely — this avoids the allocation
172        // and keeps selection as None which enables coalesced page fetches.
173        let all_selected = filters.iter().all(|f| f.true_count() == f.len());
174        if all_selected && self.selection.is_none() {
175            return Ok(self);
176        }
177        let raw = RowSelection::from_filters(&filters);
178        self.selection = match self.selection.take() {
179            Some(selection) => Some(selection.and_then(&raw)),
180            None => Some(raw),
181        };
182        Ok(self)
183    }
184
185    /// Create a final `ReadPlan` the read plan for the scan
186    pub fn build(mut self) -> ReadPlan {
187        // If selection is empty, truncate
188        if !self.selects_any() {
189            self.selection = Some(RowSelection::from(vec![]));
190        }
191
192        // Preferred strategy must not be Auto
193        let selection_strategy = self.resolve_selection_strategy();
194
195        let Self {
196            batch_size,
197            selection,
198            row_selection_policy: _,
199        } = self;
200
201        let selection = selection.map(|s| s.trim());
202
203        let row_selection_cursor = selection
204            .map(|s| {
205                let trimmed = s.trim();
206                let selectors: Vec<RowSelector> = trimmed.into();
207                match selection_strategy {
208                    RowSelectionStrategy::Mask => {
209                        RowSelectionCursor::new_mask_from_selectors(selectors)
210                    }
211                    RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
212                }
213            })
214            .unwrap_or(RowSelectionCursor::new_all());
215
216        ReadPlan {
217            batch_size,
218            row_selection_cursor,
219        }
220    }
221}
222
223/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
224///
225/// See [`ReadPlanBuilder::limited`] to create this builder.
226pub(crate) struct LimitedReadPlanBuilder {
227    /// The underlying builder
228    inner: ReadPlanBuilder,
229    /// Total number of rows in the row group before the selection, limit or
230    /// offset are applied
231    row_count: usize,
232    /// The offset to apply, if any
233    offset: Option<usize>,
234    /// The limit to apply, if any
235    limit: Option<usize>,
236}
237
238impl LimitedReadPlanBuilder {
239    /// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows
240    fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
241        Self {
242            inner,
243            row_count,
244            offset: None,
245            limit: None,
246        }
247    }
248
249    /// Set the offset to apply to the read plan
250    pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
251        self.offset = offset;
252        self
253    }
254
255    /// Set the limit to apply to the read plan
256    pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
257        self.limit = limit;
258        self
259    }
260
261    /// Apply offset and limit, updating the selection on the underlying builder
262    /// and returning it.
263    pub(crate) fn build_limited(self) -> ReadPlanBuilder {
264        let Self {
265            mut inner,
266            row_count,
267            offset,
268            limit,
269        } = self;
270
271        // If the selection is empty, truncate
272        if !inner.selects_any() {
273            inner.selection = Some(RowSelection::from(vec![]));
274        }
275
276        // If an offset is defined, apply it to the `selection`
277        if let Some(offset) = offset {
278            inner.selection = Some(match row_count.checked_sub(offset) {
279                None => RowSelection::from(vec![]),
280                Some(remaining) => inner
281                    .selection
282                    .map(|selection| selection.offset(offset))
283                    .unwrap_or_else(|| {
284                        RowSelection::from(vec![
285                            RowSelector::skip(offset),
286                            RowSelector::select(remaining),
287                        ])
288                    }),
289            });
290        }
291
292        // If a limit is defined, apply it to the final `selection`
293        if let Some(limit) = limit {
294            inner.selection = Some(
295                inner
296                    .selection
297                    .map(|selection| selection.limit(limit))
298                    .unwrap_or_else(|| {
299                        RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
300                    }),
301            );
302        }
303
304        inner
305    }
306}
307
308/// A plan reading specific rows from a Parquet Row Group.
309///
310/// See [`ReadPlanBuilder`] to create `ReadPlan`s
311#[derive(Debug)]
312pub struct ReadPlan {
313    /// The number of rows to read in each batch
314    batch_size: usize,
315    /// Row ranges to be selected from the data source
316    row_selection_cursor: RowSelectionCursor,
317}
318
319impl ReadPlan {
320    /// Returns a mutable reference to the selection selectors, if any
321    #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
322    pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
323        if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
324            Some(selectors_cursor.selectors_mut())
325        } else {
326            None
327        }
328    }
329
330    /// Returns a mutable reference to the row selection cursor
331    pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
332        &mut self.row_selection_cursor
333    }
334
335    /// Return the number of rows to read in each output batch
336    #[inline(always)]
337    pub fn batch_size(&self) -> usize {
338        self.batch_size
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
347        ReadPlanBuilder::new(1024).with_selection(Some(selection))
348    }
349
350    #[test]
351    fn preferred_selection_strategy_prefers_mask_by_default() {
352        let selection = RowSelection::from(vec![RowSelector::select(8)]);
353        let builder = builder_with_selection(selection);
354        assert_eq!(
355            builder.resolve_selection_strategy(),
356            RowSelectionStrategy::Mask
357        );
358    }
359
360    #[test]
361    fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
362        let selection = RowSelection::from(vec![RowSelector::select(8)]);
363        let builder = builder_with_selection(selection)
364            .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
365        assert_eq!(
366            builder.resolve_selection_strategy(),
367            RowSelectionStrategy::Selectors
368        );
369    }
370}