parquet/arrow/arrow_reader/
read_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
19//! from a Parquet file
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
23use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
24use crate::arrow::arrow_reader::{
25    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
26};
27use crate::errors::{ParquetError, Result};
28use arrow_array::Array;
29use arrow_select::filter::prep_null_mask_filter;
30use std::collections::VecDeque;
31
32/// A builder for [`ReadPlan`]
33#[derive(Clone, Debug)]
34pub struct ReadPlanBuilder {
35    batch_size: usize,
36    /// Which rows to select. Includes the result of all filters applied so far
37    selection: Option<RowSelection>,
38    /// Policy to use when materializing the row selection
39    row_selection_policy: RowSelectionPolicy,
40}
41
42impl ReadPlanBuilder {
43    /// Create a `ReadPlanBuilder` with the given batch size
44    pub fn new(batch_size: usize) -> Self {
45        Self {
46            batch_size,
47            selection: None,
48            row_selection_policy: RowSelectionPolicy::default(),
49        }
50    }
51
52    /// Set the current selection to the given value
53    pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
54        self.selection = selection;
55        self
56    }
57
58    /// Configure the policy to use when materialising the [`RowSelection`]
59    ///
60    /// Defaults to [`RowSelectionPolicy::Auto`]
61    pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
62        self.row_selection_policy = policy;
63        self
64    }
65
66    /// Returns the current row selection policy
67    pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
68        &self.row_selection_policy
69    }
70
71    /// Returns the current selection, if any
72    pub fn selection(&self) -> Option<&RowSelection> {
73        self.selection.as_ref()
74    }
75
76    /// Specifies the number of rows in the row group, before filtering is applied.
77    ///
78    /// Returns a [`LimitedReadPlanBuilder`] that can apply
79    /// offset and limit.
80    ///
81    /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this
82    /// selection.
83    pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
84        LimitedReadPlanBuilder::new(self, row_count)
85    }
86
87    /// Returns true if the current plan selects any rows
88    pub fn selects_any(&self) -> bool {
89        self.selection
90            .as_ref()
91            .map(|s| s.selects_any())
92            .unwrap_or(true)
93    }
94
95    /// Returns the number of rows selected, or `None` if all rows are selected.
96    pub fn num_rows_selected(&self) -> Option<usize> {
97        self.selection.as_ref().map(|s| s.row_count())
98    }
99
100    /// Returns the [`RowSelectionStrategy`] for this plan.
101    ///
102    /// Guarantees to return either `Selectors` or `Mask`, never `Auto`.
103    pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
104        match self.row_selection_policy {
105            RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
106            RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
107            RowSelectionPolicy::Auto { threshold, .. } => {
108                let selection = match self.selection.as_ref() {
109                    Some(selection) => selection,
110                    None => return RowSelectionStrategy::Selectors,
111                };
112
113                // total_rows: total number of rows selected / skipped
114                // effective_count: number of non-empty selectors
115                let (total_rows, effective_count) =
116                    selection.iter().fold((0usize, 0usize), |(rows, count), s| {
117                        if s.row_count > 0 {
118                            (rows + s.row_count, count + 1)
119                        } else {
120                            (rows, count)
121                        }
122                    });
123
124                if effective_count == 0 {
125                    return RowSelectionStrategy::Mask;
126                }
127
128                if total_rows < effective_count.saturating_mul(threshold) {
129                    RowSelectionStrategy::Mask
130                } else {
131                    RowSelectionStrategy::Selectors
132                }
133            }
134        }
135    }
136
137    /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
138    ///
139    /// If the current `selection` is `Some`, the resulting [`RowSelection`]
140    /// will be the conjunction of the existing selection and the rows selected
141    /// by `predicate`.
142    ///
143    /// Note: pre-existing selections may come from evaluating a previous predicate
144    /// or if the [`ParquetRecordBatchReader`] specified an explicit
145    /// [`RowSelection`] in addition to one or more predicates.
146    pub fn with_predicate(
147        mut self,
148        array_reader: Box<dyn ArrayReader>,
149        predicate: &mut dyn ArrowPredicate,
150    ) -> Result<Self> {
151        let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
152        let mut filters = vec![];
153        for maybe_batch in reader {
154            let maybe_batch = maybe_batch?;
155            let input_rows = maybe_batch.num_rows();
156            let filter = predicate.evaluate(maybe_batch)?;
157            // Since user supplied predicate, check error here to catch bugs quickly
158            if filter.len() != input_rows {
159                return Err(arrow_err!(
160                    "ArrowPredicate predicate returned {} rows, expected {input_rows}",
161                    filter.len()
162                ));
163            }
164            match filter.null_count() {
165                0 => filters.push(filter),
166                _ => filters.push(prep_null_mask_filter(&filter)),
167            };
168        }
169
170        let raw = RowSelection::from_filters(&filters);
171        self.selection = match self.selection.take() {
172            Some(selection) => Some(selection.and_then(&raw)),
173            None => Some(raw),
174        };
175        Ok(self)
176    }
177
178    /// Create a final `ReadPlan` the read plan for the scan
179    pub fn build(mut self) -> ReadPlan {
180        // If selection is empty, truncate
181        if !self.selects_any() {
182            self.selection = Some(RowSelection::from(vec![]));
183        }
184
185        // Preferred strategy must not be Auto
186        let selection_strategy = self.resolve_selection_strategy();
187
188        let Self {
189            batch_size,
190            selection,
191            row_selection_policy: _,
192        } = self;
193
194        let selection = selection.map(|s| s.trim());
195
196        let row_selection_cursor = selection
197            .map(|s| {
198                let trimmed = s.trim();
199                let selectors: Vec<RowSelector> = trimmed.into();
200                match selection_strategy {
201                    RowSelectionStrategy::Mask => {
202                        RowSelectionCursor::new_mask_from_selectors(selectors)
203                    }
204                    RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
205                }
206            })
207            .unwrap_or(RowSelectionCursor::new_all());
208
209        ReadPlan {
210            batch_size,
211            row_selection_cursor,
212        }
213    }
214}
215
216/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
217///
218/// See [`ReadPlanBuilder::limited`] to create this builder.
219pub(crate) struct LimitedReadPlanBuilder {
220    /// The underlying builder
221    inner: ReadPlanBuilder,
222    /// Total number of rows in the row group before the selection, limit or
223    /// offset are applied
224    row_count: usize,
225    /// The offset to apply, if any
226    offset: Option<usize>,
227    /// The limit to apply, if any
228    limit: Option<usize>,
229}
230
231impl LimitedReadPlanBuilder {
232    /// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows
233    fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
234        Self {
235            inner,
236            row_count,
237            offset: None,
238            limit: None,
239        }
240    }
241
242    /// Set the offset to apply to the read plan
243    pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
244        self.offset = offset;
245        self
246    }
247
248    /// Set the limit to apply to the read plan
249    pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
250        self.limit = limit;
251        self
252    }
253
254    /// Apply offset and limit, updating the selection on the underlying builder
255    /// and returning it.
256    pub(crate) fn build_limited(self) -> ReadPlanBuilder {
257        let Self {
258            mut inner,
259            row_count,
260            offset,
261            limit,
262        } = self;
263
264        // If the selection is empty, truncate
265        if !inner.selects_any() {
266            inner.selection = Some(RowSelection::from(vec![]));
267        }
268
269        // If an offset is defined, apply it to the `selection`
270        if let Some(offset) = offset {
271            inner.selection = Some(match row_count.checked_sub(offset) {
272                None => RowSelection::from(vec![]),
273                Some(remaining) => inner
274                    .selection
275                    .map(|selection| selection.offset(offset))
276                    .unwrap_or_else(|| {
277                        RowSelection::from(vec![
278                            RowSelector::skip(offset),
279                            RowSelector::select(remaining),
280                        ])
281                    }),
282            });
283        }
284
285        // If a limit is defined, apply it to the final `selection`
286        if let Some(limit) = limit {
287            inner.selection = Some(
288                inner
289                    .selection
290                    .map(|selection| selection.limit(limit))
291                    .unwrap_or_else(|| {
292                        RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
293                    }),
294            );
295        }
296
297        inner
298    }
299}
300
301/// A plan reading specific rows from a Parquet Row Group.
302///
303/// See [`ReadPlanBuilder`] to create `ReadPlan`s
304#[derive(Debug)]
305pub struct ReadPlan {
306    /// The number of rows to read in each batch
307    batch_size: usize,
308    /// Row ranges to be selected from the data source
309    row_selection_cursor: RowSelectionCursor,
310}
311
312impl ReadPlan {
313    /// Returns a mutable reference to the selection selectors, if any
314    #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
315    pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
316        if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
317            Some(selectors_cursor.selectors_mut())
318        } else {
319            None
320        }
321    }
322
323    /// Returns a mutable reference to the row selection cursor
324    pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
325        &mut self.row_selection_cursor
326    }
327
328    /// Return the number of rows to read in each output batch
329    #[inline(always)]
330    pub fn batch_size(&self) -> usize {
331        self.batch_size
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
340        ReadPlanBuilder::new(1024).with_selection(Some(selection))
341    }
342
343    #[test]
344    fn preferred_selection_strategy_prefers_mask_by_default() {
345        let selection = RowSelection::from(vec![RowSelector::select(8)]);
346        let builder = builder_with_selection(selection);
347        assert_eq!(
348            builder.resolve_selection_strategy(),
349            RowSelectionStrategy::Mask
350        );
351    }
352
353    #[test]
354    fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
355        let selection = RowSelection::from(vec![RowSelector::select(8)]);
356        let builder = builder_with_selection(selection)
357            .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
358        assert_eq!(
359            builder.resolve_selection_strategy(),
360            RowSelectionStrategy::Selectors
361        );
362    }
363}