parquet/arrow/arrow_reader/
read_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
19//! from a Parquet file
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
23use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
24use crate::arrow::arrow_reader::{
25    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
26};
27use crate::errors::{ParquetError, Result};
28use arrow_array::Array;
29use arrow_select::filter::prep_null_mask_filter;
30use std::collections::VecDeque;
31
32/// A builder for [`ReadPlan`]
33#[derive(Clone, Debug)]
34pub struct ReadPlanBuilder {
35    batch_size: usize,
36    /// Which rows to select. Includes the result of all filters applied so far
37    selection: Option<RowSelection>,
38    /// Policy to use when materializing the row selection
39    row_selection_policy: RowSelectionPolicy,
40}
41
42impl ReadPlanBuilder {
43    /// Create a `ReadPlanBuilder` with the given batch size
44    pub fn new(batch_size: usize) -> Self {
45        Self {
46            batch_size,
47            selection: None,
48            row_selection_policy: RowSelectionPolicy::default(),
49        }
50    }
51
52    /// Set the current selection to the given value
53    pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
54        self.selection = selection;
55        self
56    }
57
58    /// Configure the policy to use when materialising the [`RowSelection`]
59    ///
60    /// Defaults to [`RowSelectionPolicy::Auto`]
61    pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
62        self.row_selection_policy = policy;
63        self
64    }
65
66    /// Returns the current row selection policy
67    pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
68        &self.row_selection_policy
69    }
70
71    /// Returns the current selection, if any
72    pub fn selection(&self) -> Option<&RowSelection> {
73        self.selection.as_ref()
74    }
75
76    /// Specifies the number of rows in the row group, before filtering is applied.
77    ///
78    /// Returns a [`LimitedReadPlanBuilder`] that can apply
79    /// offset and limit.
80    ///
81    /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this
82    /// selection.
83    pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
84        LimitedReadPlanBuilder::new(self, row_count)
85    }
86
87    /// Returns true if the current plan selects any rows
88    pub fn selects_any(&self) -> bool {
89        self.selection
90            .as_ref()
91            .map(|s| s.selects_any())
92            .unwrap_or(true)
93    }
94
95    /// Returns the number of rows selected, or `None` if all rows are selected.
96    pub fn num_rows_selected(&self) -> Option<usize> {
97        self.selection.as_ref().map(|s| s.row_count())
98    }
99
100    /// Returns the [`RowSelectionStrategy`] for this plan.
101    ///
102    /// Guarantees to return either `Selectors` or `Mask`, never `Auto`.
103    pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
104        match self.row_selection_policy {
105            RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
106            RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
107            RowSelectionPolicy::Auto { threshold, .. } => {
108                let selection = match self.selection.as_ref() {
109                    Some(selection) => selection,
110                    None => return RowSelectionStrategy::Selectors,
111                };
112
113                let trimmed = selection.clone().trim();
114                let selectors: Vec<RowSelector> = trimmed.into();
115                if selectors.is_empty() {
116                    return RowSelectionStrategy::Mask;
117                }
118
119                let total_rows: usize = selectors.iter().map(|s| s.row_count).sum();
120                let selector_count = selectors.len();
121                if selector_count == 0 {
122                    return RowSelectionStrategy::Mask;
123                }
124
125                if total_rows < selector_count.saturating_mul(threshold) {
126                    RowSelectionStrategy::Mask
127                } else {
128                    RowSelectionStrategy::Selectors
129                }
130            }
131        }
132    }
133
134    /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
135    ///
136    /// If the current `selection` is `Some`, the resulting [`RowSelection`]
137    /// will be the conjunction of the existing selection and the rows selected
138    /// by `predicate`.
139    ///
140    /// Note: pre-existing selections may come from evaluating a previous predicate
141    /// or if the [`ParquetRecordBatchReader`] specified an explicit
142    /// [`RowSelection`] in addition to one or more predicates.
143    pub fn with_predicate(
144        mut self,
145        array_reader: Box<dyn ArrayReader>,
146        predicate: &mut dyn ArrowPredicate,
147    ) -> Result<Self> {
148        let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
149        let mut filters = vec![];
150        for maybe_batch in reader {
151            let maybe_batch = maybe_batch?;
152            let input_rows = maybe_batch.num_rows();
153            let filter = predicate.evaluate(maybe_batch)?;
154            // Since user supplied predicate, check error here to catch bugs quickly
155            if filter.len() != input_rows {
156                return Err(arrow_err!(
157                    "ArrowPredicate predicate returned {} rows, expected {input_rows}",
158                    filter.len()
159                ));
160            }
161            match filter.null_count() {
162                0 => filters.push(filter),
163                _ => filters.push(prep_null_mask_filter(&filter)),
164            };
165        }
166
167        let raw = RowSelection::from_filters(&filters);
168        self.selection = match self.selection.take() {
169            Some(selection) => Some(selection.and_then(&raw)),
170            None => Some(raw),
171        };
172        Ok(self)
173    }
174
175    /// Create a final `ReadPlan` the read plan for the scan
176    pub fn build(mut self) -> ReadPlan {
177        // If selection is empty, truncate
178        if !self.selects_any() {
179            self.selection = Some(RowSelection::from(vec![]));
180        }
181
182        // Preferred strategy must not be Auto
183        let selection_strategy = self.resolve_selection_strategy();
184
185        let Self {
186            batch_size,
187            selection,
188            row_selection_policy: _,
189        } = self;
190
191        let selection = selection.map(|s| s.trim());
192
193        let row_selection_cursor = selection
194            .map(|s| {
195                let trimmed = s.trim();
196                let selectors: Vec<RowSelector> = trimmed.into();
197                match selection_strategy {
198                    RowSelectionStrategy::Mask => {
199                        RowSelectionCursor::new_mask_from_selectors(selectors)
200                    }
201                    RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
202                }
203            })
204            .unwrap_or(RowSelectionCursor::new_all());
205
206        ReadPlan {
207            batch_size,
208            row_selection_cursor,
209        }
210    }
211}
212
213/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
214///
215/// See [`ReadPlanBuilder::limited`] to create this builder.
216pub(crate) struct LimitedReadPlanBuilder {
217    /// The underlying builder
218    inner: ReadPlanBuilder,
219    /// Total number of rows in the row group before the selection, limit or
220    /// offset are applied
221    row_count: usize,
222    /// The offset to apply, if any
223    offset: Option<usize>,
224    /// The limit to apply, if any
225    limit: Option<usize>,
226}
227
228impl LimitedReadPlanBuilder {
229    /// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows
230    fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
231        Self {
232            inner,
233            row_count,
234            offset: None,
235            limit: None,
236        }
237    }
238
239    /// Set the offset to apply to the read plan
240    pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
241        self.offset = offset;
242        self
243    }
244
245    /// Set the limit to apply to the read plan
246    pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
247        self.limit = limit;
248        self
249    }
250
251    /// Apply offset and limit, updating the selection on the underlying builder
252    /// and returning it.
253    pub(crate) fn build_limited(self) -> ReadPlanBuilder {
254        let Self {
255            mut inner,
256            row_count,
257            offset,
258            limit,
259        } = self;
260
261        // If the selection is empty, truncate
262        if !inner.selects_any() {
263            inner.selection = Some(RowSelection::from(vec![]));
264        }
265
266        // If an offset is defined, apply it to the `selection`
267        if let Some(offset) = offset {
268            inner.selection = Some(match row_count.checked_sub(offset) {
269                None => RowSelection::from(vec![]),
270                Some(remaining) => inner
271                    .selection
272                    .map(|selection| selection.offset(offset))
273                    .unwrap_or_else(|| {
274                        RowSelection::from(vec![
275                            RowSelector::skip(offset),
276                            RowSelector::select(remaining),
277                        ])
278                    }),
279            });
280        }
281
282        // If a limit is defined, apply it to the final `selection`
283        if let Some(limit) = limit {
284            inner.selection = Some(
285                inner
286                    .selection
287                    .map(|selection| selection.limit(limit))
288                    .unwrap_or_else(|| {
289                        RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
290                    }),
291            );
292        }
293
294        inner
295    }
296}
297
298/// A plan reading specific rows from a Parquet Row Group.
299///
300/// See [`ReadPlanBuilder`] to create `ReadPlan`s
301#[derive(Debug)]
302pub struct ReadPlan {
303    /// The number of rows to read in each batch
304    batch_size: usize,
305    /// Row ranges to be selected from the data source
306    row_selection_cursor: RowSelectionCursor,
307}
308
309impl ReadPlan {
310    /// Returns a mutable reference to the selection selectors, if any
311    #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
312    pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
313        if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
314            Some(selectors_cursor.selectors_mut())
315        } else {
316            None
317        }
318    }
319
320    /// Returns a mutable reference to the row selection cursor
321    pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
322        &mut self.row_selection_cursor
323    }
324
325    /// Return the number of rows to read in each output batch
326    #[inline(always)]
327    pub fn batch_size(&self) -> usize {
328        self.batch_size
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
337        ReadPlanBuilder::new(1024).with_selection(Some(selection))
338    }
339
340    #[test]
341    fn preferred_selection_strategy_prefers_mask_by_default() {
342        let selection = RowSelection::from(vec![RowSelector::select(8)]);
343        let builder = builder_with_selection(selection);
344        assert_eq!(
345            builder.resolve_selection_strategy(),
346            RowSelectionStrategy::Mask
347        );
348    }
349
350    #[test]
351    fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
352        let selection = RowSelection::from(vec![RowSelector::select(8)]);
353        let builder = builder_with_selection(selection)
354            .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
355        assert_eq!(
356            builder.resolve_selection_strategy(),
357            RowSelectionStrategy::Selectors
358        );
359    }
360}