parquet/arrow/arrow_reader/read_plan.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
19//! from a Parquet file
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::{
23 ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
24};
25use crate::errors::{ParquetError, Result};
26use arrow_array::Array;
27use arrow_select::filter::prep_null_mask_filter;
28use std::collections::VecDeque;
29
30/// A builder for [`ReadPlan`]
31#[derive(Clone)]
32pub(crate) struct ReadPlanBuilder {
33 batch_size: usize,
34 /// Current to apply, includes all filters
35 selection: Option<RowSelection>,
36}
37
38impl ReadPlanBuilder {
39 /// Create a `ReadPlanBuilder` with the given batch size
40 pub(crate) fn new(batch_size: usize) -> Self {
41 Self {
42 batch_size,
43 selection: None,
44 }
45 }
46
47 /// Set the current selection to the given value
48 pub(crate) fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
49 self.selection = selection;
50 self
51 }
52
53 /// Returns the current selection, if any
54 pub(crate) fn selection(&self) -> Option<&RowSelection> {
55 self.selection.as_ref()
56 }
57
58 /// Specifies the number of rows in the row group, before filtering is applied.
59 ///
60 /// Returns a [`LimitedReadPlanBuilder`] that can apply
61 /// offset and limit.
62 ///
63 /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this
64 /// selection.
65 pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
66 LimitedReadPlanBuilder::new(self, row_count)
67 }
68
69 /// Returns true if the current plan selects any rows
70 pub(crate) fn selects_any(&self) -> bool {
71 self.selection
72 .as_ref()
73 .map(|s| s.selects_any())
74 .unwrap_or(true)
75 }
76
77 /// Returns the number of rows selected, or `None` if all rows are selected.
78 pub(crate) fn num_rows_selected(&self) -> Option<usize> {
79 self.selection.as_ref().map(|s| s.row_count())
80 }
81
82 /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
83 ///
84 /// If the current `selection` is `Some`, the resulting [`RowSelection`]
85 /// will be the conjunction of the existing selection and the rows selected
86 /// by `predicate`.
87 ///
88 /// Note: pre-existing selections may come from evaluating a previous predicate
89 /// or if the [`ParquetRecordBatchReader`] specified an explicit
90 /// [`RowSelection`] in addition to one or more predicates.
91 pub(crate) fn with_predicate(
92 mut self,
93 array_reader: Box<dyn ArrayReader>,
94 predicate: &mut dyn ArrowPredicate,
95 ) -> Result<Self> {
96 let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
97 let mut filters = vec![];
98 for maybe_batch in reader {
99 let maybe_batch = maybe_batch?;
100 let input_rows = maybe_batch.num_rows();
101 let filter = predicate.evaluate(maybe_batch)?;
102 // Since user supplied predicate, check error here to catch bugs quickly
103 if filter.len() != input_rows {
104 return Err(arrow_err!(
105 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
106 filter.len()
107 ));
108 }
109 match filter.null_count() {
110 0 => filters.push(filter),
111 _ => filters.push(prep_null_mask_filter(&filter)),
112 };
113 }
114
115 let raw = RowSelection::from_filters(&filters);
116 self.selection = match self.selection.take() {
117 Some(selection) => Some(selection.and_then(&raw)),
118 None => Some(raw),
119 };
120 Ok(self)
121 }
122
123 /// Create a final `ReadPlan` the read plan for the scan
124 pub(crate) fn build(mut self) -> ReadPlan {
125 // If selection is empty, truncate
126 if !self.selects_any() {
127 self.selection = Some(RowSelection::from(vec![]));
128 }
129 let Self {
130 batch_size,
131 selection,
132 } = self;
133
134 let selection = selection.map(|s| s.trim().into());
135
136 ReadPlan {
137 batch_size,
138 selection,
139 }
140 }
141}
142
143/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
144///
145/// See [`ReadPlanBuilder::limited`] to create this builder.
146pub(crate) struct LimitedReadPlanBuilder {
147 /// The underlying builder
148 inner: ReadPlanBuilder,
149 /// Total number of rows in the row group before the selection, limit or
150 /// offset are applied
151 row_count: usize,
152 /// The offset to apply, if any
153 offset: Option<usize>,
154 /// The limit to apply, if any
155 limit: Option<usize>,
156}
157
158impl LimitedReadPlanBuilder {
159 /// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows
160 fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
161 Self {
162 inner,
163 row_count,
164 offset: None,
165 limit: None,
166 }
167 }
168
169 /// Set the offset to apply to the read plan
170 pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
171 self.offset = offset;
172 self
173 }
174
175 /// Set the limit to apply to the read plan
176 pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
177 self.limit = limit;
178 self
179 }
180
181 /// Apply offset and limit, updating the selection on the underlying builder
182 /// and returning it.
183 pub(crate) fn build_limited(self) -> ReadPlanBuilder {
184 let Self {
185 mut inner,
186 row_count,
187 offset,
188 limit,
189 } = self;
190
191 // If the selection is empty, truncate
192 if !inner.selects_any() {
193 inner.selection = Some(RowSelection::from(vec![]));
194 }
195
196 // If an offset is defined, apply it to the `selection`
197 if let Some(offset) = offset {
198 inner.selection = Some(match row_count.checked_sub(offset) {
199 None => RowSelection::from(vec![]),
200 Some(remaining) => inner
201 .selection
202 .map(|selection| selection.offset(offset))
203 .unwrap_or_else(|| {
204 RowSelection::from(vec![
205 RowSelector::skip(offset),
206 RowSelector::select(remaining),
207 ])
208 }),
209 });
210 }
211
212 // If a limit is defined, apply it to the final `selection`
213 if let Some(limit) = limit {
214 inner.selection = Some(
215 inner
216 .selection
217 .map(|selection| selection.limit(limit))
218 .unwrap_or_else(|| {
219 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
220 }),
221 );
222 }
223
224 inner
225 }
226}
227
228/// A plan reading specific rows from a Parquet Row Group.
229///
230/// See [`ReadPlanBuilder`] to create `ReadPlan`s
231pub(crate) struct ReadPlan {
232 /// The number of rows to read in each batch
233 batch_size: usize,
234 /// Row ranges to be selected from the data source
235 selection: Option<VecDeque<RowSelector>>,
236}
237
238impl ReadPlan {
239 /// Returns a mutable reference to the selection, if any
240 pub(crate) fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
241 self.selection.as_mut()
242 }
243
244 /// Return the number of rows to read in each output batch
245 #[inline(always)]
246 pub fn batch_size(&self) -> usize {
247 self.batch_size
248 }
249}