parquet/arrow/arrow_reader/
read_plan.rs1use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
23use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
24use crate::arrow::arrow_reader::{
25 ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
26};
27use crate::errors::{ParquetError, Result};
28use arrow_array::Array;
29use arrow_select::filter::prep_null_mask_filter;
30use std::collections::VecDeque;
31
32#[derive(Clone, Debug)]
34pub struct ReadPlanBuilder {
35 batch_size: usize,
36 selection: Option<RowSelection>,
38 row_selection_policy: RowSelectionPolicy,
40}
41
42impl ReadPlanBuilder {
43 pub fn new(batch_size: usize) -> Self {
45 Self {
46 batch_size,
47 selection: None,
48 row_selection_policy: RowSelectionPolicy::default(),
49 }
50 }
51
52 pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
54 self.selection = selection;
55 self
56 }
57
58 pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
62 self.row_selection_policy = policy;
63 self
64 }
65
66 pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
68 &self.row_selection_policy
69 }
70
71 pub fn selection(&self) -> Option<&RowSelection> {
73 self.selection.as_ref()
74 }
75
76 pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
84 LimitedReadPlanBuilder::new(self, row_count)
85 }
86
87 pub fn selects_any(&self) -> bool {
89 self.selection
90 .as_ref()
91 .map(|s| s.selects_any())
92 .unwrap_or(true)
93 }
94
95 pub fn num_rows_selected(&self) -> Option<usize> {
97 self.selection.as_ref().map(|s| s.row_count())
98 }
99
100 pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
104 match self.row_selection_policy {
105 RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
106 RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
107 RowSelectionPolicy::Auto { threshold, .. } => {
108 let selection = match self.selection.as_ref() {
109 Some(selection) => selection,
110 None => return RowSelectionStrategy::Selectors,
111 };
112
113 let trimmed = selection.clone().trim();
114 let selectors: Vec<RowSelector> = trimmed.into();
115 if selectors.is_empty() {
116 return RowSelectionStrategy::Mask;
117 }
118
119 let total_rows: usize = selectors.iter().map(|s| s.row_count).sum();
120 let selector_count = selectors.len();
121 if selector_count == 0 {
122 return RowSelectionStrategy::Mask;
123 }
124
125 if total_rows < selector_count.saturating_mul(threshold) {
126 RowSelectionStrategy::Mask
127 } else {
128 RowSelectionStrategy::Selectors
129 }
130 }
131 }
132 }
133
134 pub fn with_predicate(
144 mut self,
145 array_reader: Box<dyn ArrayReader>,
146 predicate: &mut dyn ArrowPredicate,
147 ) -> Result<Self> {
148 let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
149 let mut filters = vec![];
150 for maybe_batch in reader {
151 let maybe_batch = maybe_batch?;
152 let input_rows = maybe_batch.num_rows();
153 let filter = predicate.evaluate(maybe_batch)?;
154 if filter.len() != input_rows {
156 return Err(arrow_err!(
157 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
158 filter.len()
159 ));
160 }
161 match filter.null_count() {
162 0 => filters.push(filter),
163 _ => filters.push(prep_null_mask_filter(&filter)),
164 };
165 }
166
167 let raw = RowSelection::from_filters(&filters);
168 self.selection = match self.selection.take() {
169 Some(selection) => Some(selection.and_then(&raw)),
170 None => Some(raw),
171 };
172 Ok(self)
173 }
174
175 pub fn build(mut self) -> ReadPlan {
177 if !self.selects_any() {
179 self.selection = Some(RowSelection::from(vec![]));
180 }
181
182 let selection_strategy = self.resolve_selection_strategy();
184
185 let Self {
186 batch_size,
187 selection,
188 row_selection_policy: _,
189 } = self;
190
191 let selection = selection.map(|s| s.trim());
192
193 let row_selection_cursor = selection
194 .map(|s| {
195 let trimmed = s.trim();
196 let selectors: Vec<RowSelector> = trimmed.into();
197 match selection_strategy {
198 RowSelectionStrategy::Mask => {
199 RowSelectionCursor::new_mask_from_selectors(selectors)
200 }
201 RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
202 }
203 })
204 .unwrap_or(RowSelectionCursor::new_all());
205
206 ReadPlan {
207 batch_size,
208 row_selection_cursor,
209 }
210 }
211}
212
213pub(crate) struct LimitedReadPlanBuilder {
217 inner: ReadPlanBuilder,
219 row_count: usize,
222 offset: Option<usize>,
224 limit: Option<usize>,
226}
227
228impl LimitedReadPlanBuilder {
229 fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
231 Self {
232 inner,
233 row_count,
234 offset: None,
235 limit: None,
236 }
237 }
238
239 pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
241 self.offset = offset;
242 self
243 }
244
245 pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
247 self.limit = limit;
248 self
249 }
250
251 pub(crate) fn build_limited(self) -> ReadPlanBuilder {
254 let Self {
255 mut inner,
256 row_count,
257 offset,
258 limit,
259 } = self;
260
261 if !inner.selects_any() {
263 inner.selection = Some(RowSelection::from(vec![]));
264 }
265
266 if let Some(offset) = offset {
268 inner.selection = Some(match row_count.checked_sub(offset) {
269 None => RowSelection::from(vec![]),
270 Some(remaining) => inner
271 .selection
272 .map(|selection| selection.offset(offset))
273 .unwrap_or_else(|| {
274 RowSelection::from(vec![
275 RowSelector::skip(offset),
276 RowSelector::select(remaining),
277 ])
278 }),
279 });
280 }
281
282 if let Some(limit) = limit {
284 inner.selection = Some(
285 inner
286 .selection
287 .map(|selection| selection.limit(limit))
288 .unwrap_or_else(|| {
289 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
290 }),
291 );
292 }
293
294 inner
295 }
296}
297
298#[derive(Debug)]
302pub struct ReadPlan {
303 batch_size: usize,
305 row_selection_cursor: RowSelectionCursor,
307}
308
309impl ReadPlan {
310 #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
312 pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
313 if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
314 Some(selectors_cursor.selectors_mut())
315 } else {
316 None
317 }
318 }
319
320 pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
322 &mut self.row_selection_cursor
323 }
324
325 #[inline(always)]
327 pub fn batch_size(&self) -> usize {
328 self.batch_size
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
337 ReadPlanBuilder::new(1024).with_selection(Some(selection))
338 }
339
340 #[test]
341 fn preferred_selection_strategy_prefers_mask_by_default() {
342 let selection = RowSelection::from(vec![RowSelector::select(8)]);
343 let builder = builder_with_selection(selection);
344 assert_eq!(
345 builder.resolve_selection_strategy(),
346 RowSelectionStrategy::Mask
347 );
348 }
349
350 #[test]
351 fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
352 let selection = RowSelection::from(vec![RowSelector::select(8)]);
353 let builder = builder_with_selection(selection)
354 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
355 assert_eq!(
356 builder.resolve_selection_strategy(),
357 RowSelectionStrategy::Selectors
358 );
359 }
360}