parquet/arrow/arrow_reader/
read_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
19//! from a Parquet file
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::{
23    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
24};
25use crate::errors::{ParquetError, Result};
26use arrow_array::Array;
27use arrow_select::filter::prep_null_mask_filter;
28use std::collections::VecDeque;
29
30/// A builder for [`ReadPlan`]
31#[derive(Clone)]
32pub(crate) struct ReadPlanBuilder {
33    batch_size: usize,
34    /// Current to apply, includes all filters
35    selection: Option<RowSelection>,
36}
37
38impl ReadPlanBuilder {
39    /// Create a `ReadPlanBuilder` with the given batch size
40    pub(crate) fn new(batch_size: usize) -> Self {
41        Self {
42            batch_size,
43            selection: None,
44        }
45    }
46
47    /// Set the current selection to the given value
48    pub(crate) fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
49        self.selection = selection;
50        self
51    }
52
53    /// Returns the current selection, if any
54    #[cfg(feature = "async")]
55    pub(crate) fn selection(&self) -> Option<&RowSelection> {
56        self.selection.as_ref()
57    }
58
59    /// Specifies the number of rows in the row group, before filtering is applied.
60    ///
61    /// Returns a [`LimitedReadPlanBuilder`] that can apply
62    /// offset and limit.
63    ///
64    /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this
65    /// selection.
66    pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
67        LimitedReadPlanBuilder::new(self, row_count)
68    }
69
70    /// Returns true if the current plan selects any rows
71    pub(crate) fn selects_any(&self) -> bool {
72        self.selection
73            .as_ref()
74            .map(|s| s.selects_any())
75            .unwrap_or(true)
76    }
77
78    /// Returns the number of rows selected, or `None` if all rows are selected.
79    #[cfg(feature = "async")]
80    pub(crate) fn num_rows_selected(&self) -> Option<usize> {
81        self.selection.as_ref().map(|s| s.row_count())
82    }
83
84    /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
85    ///
86    /// If the current `selection` is `Some`, the resulting [`RowSelection`]
87    /// will be the conjunction of the existing selection and the rows selected
88    /// by `predicate`.
89    ///
90    /// Note: pre-existing selections may come from evaluating a previous predicate
91    /// or if the [`ParquetRecordBatchReader`] specified an explicit
92    /// [`RowSelection`] in addition to one or more predicates.
93    pub(crate) fn with_predicate(
94        mut self,
95        array_reader: Box<dyn ArrayReader>,
96        predicate: &mut dyn ArrowPredicate,
97    ) -> Result<Self> {
98        let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
99        let mut filters = vec![];
100        for maybe_batch in reader {
101            let maybe_batch = maybe_batch?;
102            let input_rows = maybe_batch.num_rows();
103            let filter = predicate.evaluate(maybe_batch)?;
104            // Since user supplied predicate, check error here to catch bugs quickly
105            if filter.len() != input_rows {
106                return Err(arrow_err!(
107                    "ArrowPredicate predicate returned {} rows, expected {input_rows}",
108                    filter.len()
109                ));
110            }
111            match filter.null_count() {
112                0 => filters.push(filter),
113                _ => filters.push(prep_null_mask_filter(&filter)),
114            };
115        }
116
117        let raw = RowSelection::from_filters(&filters);
118        self.selection = match self.selection.take() {
119            Some(selection) => Some(selection.and_then(&raw)),
120            None => Some(raw),
121        };
122        Ok(self)
123    }
124
125    /// Create a final `ReadPlan` the read plan for the scan
126    pub(crate) fn build(mut self) -> ReadPlan {
127        // If selection is empty, truncate
128        if !self.selects_any() {
129            self.selection = Some(RowSelection::from(vec![]));
130        }
131        let Self {
132            batch_size,
133            selection,
134        } = self;
135
136        let selection = selection.map(|s| s.trim().into());
137
138        ReadPlan {
139            batch_size,
140            selection,
141        }
142    }
143}
144
145/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
146///
147/// See [`ReadPlanBuilder::limited`] to create this builder.
148pub(crate) struct LimitedReadPlanBuilder {
149    /// The underlying builder
150    inner: ReadPlanBuilder,
151    /// Total number of rows in the row group before the selection, limit or
152    /// offset are applied
153    row_count: usize,
154    /// The offset to apply, if any
155    offset: Option<usize>,
156    /// The limit to apply, if any
157    limit: Option<usize>,
158}
159
160impl LimitedReadPlanBuilder {
161    /// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows
162    fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
163        Self {
164            inner,
165            row_count,
166            offset: None,
167            limit: None,
168        }
169    }
170
171    /// Set the offset to apply to the read plan
172    pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
173        self.offset = offset;
174        self
175    }
176
177    /// Set the limit to apply to the read plan
178    pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
179        self.limit = limit;
180        self
181    }
182
183    /// Apply offset and limit, updating the selection on the underlying builder
184    /// and returning it.
185    pub(crate) fn build_limited(self) -> ReadPlanBuilder {
186        let Self {
187            mut inner,
188            row_count,
189            offset,
190            limit,
191        } = self;
192
193        // If the selection is empty, truncate
194        if !inner.selects_any() {
195            inner.selection = Some(RowSelection::from(vec![]));
196        }
197
198        // If an offset is defined, apply it to the `selection`
199        if let Some(offset) = offset {
200            inner.selection = Some(match row_count.checked_sub(offset) {
201                None => RowSelection::from(vec![]),
202                Some(remaining) => inner
203                    .selection
204                    .map(|selection| selection.offset(offset))
205                    .unwrap_or_else(|| {
206                        RowSelection::from(vec![
207                            RowSelector::skip(offset),
208                            RowSelector::select(remaining),
209                        ])
210                    }),
211            });
212        }
213
214        // If a limit is defined, apply it to the final `selection`
215        if let Some(limit) = limit {
216            inner.selection = Some(
217                inner
218                    .selection
219                    .map(|selection| selection.limit(limit))
220                    .unwrap_or_else(|| {
221                        RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
222                    }),
223            );
224        }
225
226        inner
227    }
228}
229
230/// A plan reading specific rows from a Parquet Row Group.
231///
232/// See [`ReadPlanBuilder`] to create `ReadPlan`s
233pub(crate) struct ReadPlan {
234    /// The number of rows to read in each batch
235    batch_size: usize,
236    /// Row ranges to be selected from the data source
237    selection: Option<VecDeque<RowSelector>>,
238}
239
240impl ReadPlan {
241    /// Returns a mutable reference to the selection, if any
242    pub(crate) fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
243        self.selection.as_mut()
244    }
245
246    /// Return the number of rows to read in each output batch
247    #[inline(always)]
248    pub fn batch_size(&self) -> usize {
249        self.batch_size
250    }
251}