parquet/arrow/arrow_reader/read_plan.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
19//! from a Parquet file
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::{
23 ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
24};
25use crate::errors::{ParquetError, Result};
26use arrow_array::Array;
27use arrow_select::filter::prep_null_mask_filter;
28use std::collections::VecDeque;
29
30/// A builder for [`ReadPlan`]
31#[derive(Clone)]
32pub(crate) struct ReadPlanBuilder {
33 batch_size: usize,
34 /// Current to apply, includes all filters
35 selection: Option<RowSelection>,
36}
37
38impl ReadPlanBuilder {
39 /// Create a `ReadPlanBuilder` with the given batch size
40 pub(crate) fn new(batch_size: usize) -> Self {
41 Self {
42 batch_size,
43 selection: None,
44 }
45 }
46
47 /// Set the current selection to the given value
48 pub(crate) fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
49 self.selection = selection;
50 self
51 }
52
53 /// Returns the current selection, if any
54 #[cfg(feature = "async")]
55 pub(crate) fn selection(&self) -> Option<&RowSelection> {
56 self.selection.as_ref()
57 }
58
59 /// Specifies the number of rows in the row group, before filtering is applied.
60 ///
61 /// Returns a [`LimitedReadPlanBuilder`] that can apply
62 /// offset and limit.
63 ///
64 /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this
65 /// selection.
66 pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
67 LimitedReadPlanBuilder::new(self, row_count)
68 }
69
70 /// Returns true if the current plan selects any rows
71 pub(crate) fn selects_any(&self) -> bool {
72 self.selection
73 .as_ref()
74 .map(|s| s.selects_any())
75 .unwrap_or(true)
76 }
77
78 /// Returns the number of rows selected, or `None` if all rows are selected.
79 #[cfg(feature = "async")]
80 pub(crate) fn num_rows_selected(&self) -> Option<usize> {
81 self.selection.as_ref().map(|s| s.row_count())
82 }
83
84 /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
85 ///
86 /// If the current `selection` is `Some`, the resulting [`RowSelection`]
87 /// will be the conjunction of the existing selection and the rows selected
88 /// by `predicate`.
89 ///
90 /// Note: pre-existing selections may come from evaluating a previous predicate
91 /// or if the [`ParquetRecordBatchReader`] specified an explicit
92 /// [`RowSelection`] in addition to one or more predicates.
93 pub(crate) fn with_predicate(
94 mut self,
95 array_reader: Box<dyn ArrayReader>,
96 predicate: &mut dyn ArrowPredicate,
97 ) -> Result<Self> {
98 let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
99 let mut filters = vec![];
100 for maybe_batch in reader {
101 let maybe_batch = maybe_batch?;
102 let input_rows = maybe_batch.num_rows();
103 let filter = predicate.evaluate(maybe_batch)?;
104 // Since user supplied predicate, check error here to catch bugs quickly
105 if filter.len() != input_rows {
106 return Err(arrow_err!(
107 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
108 filter.len()
109 ));
110 }
111 match filter.null_count() {
112 0 => filters.push(filter),
113 _ => filters.push(prep_null_mask_filter(&filter)),
114 };
115 }
116
117 let raw = RowSelection::from_filters(&filters);
118 self.selection = match self.selection.take() {
119 Some(selection) => Some(selection.and_then(&raw)),
120 None => Some(raw),
121 };
122 Ok(self)
123 }
124
125 /// Create a final `ReadPlan` the read plan for the scan
126 pub(crate) fn build(mut self) -> ReadPlan {
127 // If selection is empty, truncate
128 if !self.selects_any() {
129 self.selection = Some(RowSelection::from(vec![]));
130 }
131 let Self {
132 batch_size,
133 selection,
134 } = self;
135
136 let selection = selection.map(|s| s.trim().into());
137
138 ReadPlan {
139 batch_size,
140 selection,
141 }
142 }
143}
144
145/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
146///
147/// See [`ReadPlanBuilder::limited`] to create this builder.
148pub(crate) struct LimitedReadPlanBuilder {
149 /// The underlying builder
150 inner: ReadPlanBuilder,
151 /// Total number of rows in the row group before the selection, limit or
152 /// offset are applied
153 row_count: usize,
154 /// The offset to apply, if any
155 offset: Option<usize>,
156 /// The limit to apply, if any
157 limit: Option<usize>,
158}
159
160impl LimitedReadPlanBuilder {
161 /// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows
162 fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
163 Self {
164 inner,
165 row_count,
166 offset: None,
167 limit: None,
168 }
169 }
170
171 /// Set the offset to apply to the read plan
172 pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
173 self.offset = offset;
174 self
175 }
176
177 /// Set the limit to apply to the read plan
178 pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
179 self.limit = limit;
180 self
181 }
182
183 /// Apply offset and limit, updating the selection on the underlying builder
184 /// and returning it.
185 pub(crate) fn build_limited(self) -> ReadPlanBuilder {
186 let Self {
187 mut inner,
188 row_count,
189 offset,
190 limit,
191 } = self;
192
193 // If the selection is empty, truncate
194 if !inner.selects_any() {
195 inner.selection = Some(RowSelection::from(vec![]));
196 }
197
198 // If an offset is defined, apply it to the `selection`
199 if let Some(offset) = offset {
200 inner.selection = Some(match row_count.checked_sub(offset) {
201 None => RowSelection::from(vec![]),
202 Some(remaining) => inner
203 .selection
204 .map(|selection| selection.offset(offset))
205 .unwrap_or_else(|| {
206 RowSelection::from(vec![
207 RowSelector::skip(offset),
208 RowSelector::select(remaining),
209 ])
210 }),
211 });
212 }
213
214 // If a limit is defined, apply it to the final `selection`
215 if let Some(limit) = limit {
216 inner.selection = Some(
217 inner
218 .selection
219 .map(|selection| selection.limit(limit))
220 .unwrap_or_else(|| {
221 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
222 }),
223 );
224 }
225
226 inner
227 }
228}
229
230/// A plan reading specific rows from a Parquet Row Group.
231///
232/// See [`ReadPlanBuilder`] to create `ReadPlan`s
233pub(crate) struct ReadPlan {
234 /// The number of rows to read in each batch
235 batch_size: usize,
236 /// Row ranges to be selected from the data source
237 selection: Option<VecDeque<RowSelector>>,
238}
239
240impl ReadPlan {
241 /// Returns a mutable reference to the selection, if any
242 pub(crate) fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
243 self.selection.as_mut()
244 }
245
246 /// Return the number of rows to read in each output batch
247 #[inline(always)]
248 pub fn batch_size(&self) -> usize {
249 self.batch_size
250 }
251}