parquet/arrow/arrow_reader/
read_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
19//! from a Parquet file
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::{
23    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
24};
25use crate::errors::{ParquetError, Result};
26use arrow_array::Array;
27use arrow_select::filter::prep_null_mask_filter;
28use std::collections::VecDeque;
29
30/// A builder for [`ReadPlan`]
31#[derive(Clone)]
32pub(crate) struct ReadPlanBuilder {
33    batch_size: usize,
34    /// Current to apply, includes all filters
35    selection: Option<RowSelection>,
36}
37
38impl ReadPlanBuilder {
39    /// Create a `ReadPlanBuilder` with the given batch size
40    pub(crate) fn new(batch_size: usize) -> Self {
41        Self {
42            batch_size,
43            selection: None,
44        }
45    }
46
47    /// Set the current selection to the given value
48    pub(crate) fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
49        self.selection = selection;
50        self
51    }
52
53    /// Returns the current selection, if any
54    pub(crate) fn selection(&self) -> Option<&RowSelection> {
55        self.selection.as_ref()
56    }
57
58    /// Specifies the number of rows in the row group, before filtering is applied.
59    ///
60    /// Returns a [`LimitedReadPlanBuilder`] that can apply
61    /// offset and limit.
62    ///
63    /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this
64    /// selection.
65    pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
66        LimitedReadPlanBuilder::new(self, row_count)
67    }
68
69    /// Returns true if the current plan selects any rows
70    pub(crate) fn selects_any(&self) -> bool {
71        self.selection
72            .as_ref()
73            .map(|s| s.selects_any())
74            .unwrap_or(true)
75    }
76
77    /// Returns the number of rows selected, or `None` if all rows are selected.
78    pub(crate) fn num_rows_selected(&self) -> Option<usize> {
79        self.selection.as_ref().map(|s| s.row_count())
80    }
81
82    /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
83    ///
84    /// If the current `selection` is `Some`, the resulting [`RowSelection`]
85    /// will be the conjunction of the existing selection and the rows selected
86    /// by `predicate`.
87    ///
88    /// Note: pre-existing selections may come from evaluating a previous predicate
89    /// or if the [`ParquetRecordBatchReader`] specified an explicit
90    /// [`RowSelection`] in addition to one or more predicates.
91    pub(crate) fn with_predicate(
92        mut self,
93        array_reader: Box<dyn ArrayReader>,
94        predicate: &mut dyn ArrowPredicate,
95    ) -> Result<Self> {
96        let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
97        let mut filters = vec![];
98        for maybe_batch in reader {
99            let maybe_batch = maybe_batch?;
100            let input_rows = maybe_batch.num_rows();
101            let filter = predicate.evaluate(maybe_batch)?;
102            // Since user supplied predicate, check error here to catch bugs quickly
103            if filter.len() != input_rows {
104                return Err(arrow_err!(
105                    "ArrowPredicate predicate returned {} rows, expected {input_rows}",
106                    filter.len()
107                ));
108            }
109            match filter.null_count() {
110                0 => filters.push(filter),
111                _ => filters.push(prep_null_mask_filter(&filter)),
112            };
113        }
114
115        let raw = RowSelection::from_filters(&filters);
116        self.selection = match self.selection.take() {
117            Some(selection) => Some(selection.and_then(&raw)),
118            None => Some(raw),
119        };
120        Ok(self)
121    }
122
123    /// Create a final `ReadPlan` the read plan for the scan
124    pub(crate) fn build(mut self) -> ReadPlan {
125        // If selection is empty, truncate
126        if !self.selects_any() {
127            self.selection = Some(RowSelection::from(vec![]));
128        }
129        let Self {
130            batch_size,
131            selection,
132        } = self;
133
134        let selection = selection.map(|s| s.trim().into());
135
136        ReadPlan {
137            batch_size,
138            selection,
139        }
140    }
141}
142
143/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
144///
145/// See [`ReadPlanBuilder::limited`] to create this builder.
146pub(crate) struct LimitedReadPlanBuilder {
147    /// The underlying builder
148    inner: ReadPlanBuilder,
149    /// Total number of rows in the row group before the selection, limit or
150    /// offset are applied
151    row_count: usize,
152    /// The offset to apply, if any
153    offset: Option<usize>,
154    /// The limit to apply, if any
155    limit: Option<usize>,
156}
157
158impl LimitedReadPlanBuilder {
159    /// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows
160    fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
161        Self {
162            inner,
163            row_count,
164            offset: None,
165            limit: None,
166        }
167    }
168
169    /// Set the offset to apply to the read plan
170    pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
171        self.offset = offset;
172        self
173    }
174
175    /// Set the limit to apply to the read plan
176    pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
177        self.limit = limit;
178        self
179    }
180
181    /// Apply offset and limit, updating the selection on the underlying builder
182    /// and returning it.
183    pub(crate) fn build_limited(self) -> ReadPlanBuilder {
184        let Self {
185            mut inner,
186            row_count,
187            offset,
188            limit,
189        } = self;
190
191        // If the selection is empty, truncate
192        if !inner.selects_any() {
193            inner.selection = Some(RowSelection::from(vec![]));
194        }
195
196        // If an offset is defined, apply it to the `selection`
197        if let Some(offset) = offset {
198            inner.selection = Some(match row_count.checked_sub(offset) {
199                None => RowSelection::from(vec![]),
200                Some(remaining) => inner
201                    .selection
202                    .map(|selection| selection.offset(offset))
203                    .unwrap_or_else(|| {
204                        RowSelection::from(vec![
205                            RowSelector::skip(offset),
206                            RowSelector::select(remaining),
207                        ])
208                    }),
209            });
210        }
211
212        // If a limit is defined, apply it to the final `selection`
213        if let Some(limit) = limit {
214            inner.selection = Some(
215                inner
216                    .selection
217                    .map(|selection| selection.limit(limit))
218                    .unwrap_or_else(|| {
219                        RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
220                    }),
221            );
222        }
223
224        inner
225    }
226}
227
228/// A plan reading specific rows from a Parquet Row Group.
229///
230/// See [`ReadPlanBuilder`] to create `ReadPlan`s
231pub(crate) struct ReadPlan {
232    /// The number of rows to read in each batch
233    batch_size: usize,
234    /// Row ranges to be selected from the data source
235    selection: Option<VecDeque<RowSelector>>,
236}
237
238impl ReadPlan {
239    /// Returns a mutable reference to the selection, if any
240    pub(crate) fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
241        self.selection.as_mut()
242    }
243
244    /// Return the number of rows to read in each output batch
245    #[inline(always)]
246    pub fn batch_size(&self) -> usize {
247        self.batch_size
248    }
249}