parquet/arrow/arrow_reader/
read_plan.rs1use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
23use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
24use crate::arrow::arrow_reader::{
25 ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
26};
27use crate::errors::{ParquetError, Result};
28use arrow_array::Array;
29use arrow_select::filter::prep_null_mask_filter;
30use std::collections::VecDeque;
31
32#[derive(Clone, Debug)]
34pub struct ReadPlanBuilder {
35 batch_size: usize,
36 selection: Option<RowSelection>,
38 row_selection_policy: RowSelectionPolicy,
40}
41
42impl ReadPlanBuilder {
43 pub fn new(batch_size: usize) -> Self {
45 Self {
46 batch_size,
47 selection: None,
48 row_selection_policy: RowSelectionPolicy::default(),
49 }
50 }
51
52 pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
54 self.selection = selection;
55 self
56 }
57
58 pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
62 self.row_selection_policy = policy;
63 self
64 }
65
66 pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
68 &self.row_selection_policy
69 }
70
71 pub fn selection(&self) -> Option<&RowSelection> {
73 self.selection.as_ref()
74 }
75
76 pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
84 LimitedReadPlanBuilder::new(self, row_count)
85 }
86
87 pub fn selects_any(&self) -> bool {
89 self.selection
90 .as_ref()
91 .map(|s| s.selects_any())
92 .unwrap_or(true)
93 }
94
95 pub fn num_rows_selected(&self) -> Option<usize> {
97 self.selection.as_ref().map(|s| s.row_count())
98 }
99
100 pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
104 match self.row_selection_policy {
105 RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
106 RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
107 RowSelectionPolicy::Auto { threshold, .. } => {
108 let selection = match self.selection.as_ref() {
109 Some(selection) => selection,
110 None => return RowSelectionStrategy::Selectors,
111 };
112
113 let (total_rows, effective_count) =
116 selection.iter().fold((0usize, 0usize), |(rows, count), s| {
117 if s.row_count > 0 {
118 (rows + s.row_count, count + 1)
119 } else {
120 (rows, count)
121 }
122 });
123
124 if effective_count == 0 {
125 return RowSelectionStrategy::Mask;
126 }
127
128 if total_rows < effective_count.saturating_mul(threshold) {
129 RowSelectionStrategy::Mask
130 } else {
131 RowSelectionStrategy::Selectors
132 }
133 }
134 }
135 }
136
137 pub fn with_predicate(
147 mut self,
148 array_reader: Box<dyn ArrayReader>,
149 predicate: &mut dyn ArrowPredicate,
150 ) -> Result<Self> {
151 let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
152 let mut filters = vec![];
153 for maybe_batch in reader {
154 let maybe_batch = maybe_batch?;
155 let input_rows = maybe_batch.num_rows();
156 let filter = predicate.evaluate(maybe_batch)?;
157 if filter.len() != input_rows {
159 return Err(arrow_err!(
160 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
161 filter.len()
162 ));
163 }
164 match filter.null_count() {
165 0 => filters.push(filter),
166 _ => filters.push(prep_null_mask_filter(&filter)),
167 };
168 }
169
170 let all_selected = filters.iter().all(|f| f.true_count() == f.len());
174 if all_selected && self.selection.is_none() {
175 return Ok(self);
176 }
177 let raw = RowSelection::from_filters(&filters);
178 self.selection = match self.selection.take() {
179 Some(selection) => Some(selection.and_then(&raw)),
180 None => Some(raw),
181 };
182 Ok(self)
183 }
184
185 pub fn build(mut self) -> ReadPlan {
187 if !self.selects_any() {
189 self.selection = Some(RowSelection::from(vec![]));
190 }
191
192 let selection_strategy = self.resolve_selection_strategy();
194
195 let Self {
196 batch_size,
197 selection,
198 row_selection_policy: _,
199 } = self;
200
201 let selection = selection.map(|s| s.trim());
202
203 let row_selection_cursor = selection
204 .map(|s| {
205 let trimmed = s.trim();
206 let selectors: Vec<RowSelector> = trimmed.into();
207 match selection_strategy {
208 RowSelectionStrategy::Mask => {
209 RowSelectionCursor::new_mask_from_selectors(selectors)
210 }
211 RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
212 }
213 })
214 .unwrap_or(RowSelectionCursor::new_all());
215
216 ReadPlan {
217 batch_size,
218 row_selection_cursor,
219 }
220 }
221}
222
223pub(crate) struct LimitedReadPlanBuilder {
227 inner: ReadPlanBuilder,
229 row_count: usize,
232 offset: Option<usize>,
234 limit: Option<usize>,
236}
237
238impl LimitedReadPlanBuilder {
239 fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
241 Self {
242 inner,
243 row_count,
244 offset: None,
245 limit: None,
246 }
247 }
248
249 pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
251 self.offset = offset;
252 self
253 }
254
255 pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
257 self.limit = limit;
258 self
259 }
260
261 pub(crate) fn build_limited(self) -> ReadPlanBuilder {
264 let Self {
265 mut inner,
266 row_count,
267 offset,
268 limit,
269 } = self;
270
271 if !inner.selects_any() {
273 inner.selection = Some(RowSelection::from(vec![]));
274 }
275
276 if let Some(offset) = offset {
278 inner.selection = Some(match row_count.checked_sub(offset) {
279 None => RowSelection::from(vec![]),
280 Some(remaining) => inner
281 .selection
282 .map(|selection| selection.offset(offset))
283 .unwrap_or_else(|| {
284 RowSelection::from(vec![
285 RowSelector::skip(offset),
286 RowSelector::select(remaining),
287 ])
288 }),
289 });
290 }
291
292 if let Some(limit) = limit {
294 inner.selection = Some(
295 inner
296 .selection
297 .map(|selection| selection.limit(limit))
298 .unwrap_or_else(|| {
299 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
300 }),
301 );
302 }
303
304 inner
305 }
306}
307
308#[derive(Debug)]
312pub struct ReadPlan {
313 batch_size: usize,
315 row_selection_cursor: RowSelectionCursor,
317}
318
319impl ReadPlan {
320 #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
322 pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
323 if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
324 Some(selectors_cursor.selectors_mut())
325 } else {
326 None
327 }
328 }
329
330 pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
332 &mut self.row_selection_cursor
333 }
334
335 #[inline(always)]
337 pub fn batch_size(&self) -> usize {
338 self.batch_size
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
347 ReadPlanBuilder::new(1024).with_selection(Some(selection))
348 }
349
350 #[test]
351 fn preferred_selection_strategy_prefers_mask_by_default() {
352 let selection = RowSelection::from(vec![RowSelector::select(8)]);
353 let builder = builder_with_selection(selection);
354 assert_eq!(
355 builder.resolve_selection_strategy(),
356 RowSelectionStrategy::Mask
357 );
358 }
359
360 #[test]
361 fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
362 let selection = RowSelection::from(vec![RowSelector::select(8)]);
363 let builder = builder_with_selection(selection)
364 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
365 assert_eq!(
366 builder.resolve_selection_strategy(),
367 RowSelectionStrategy::Selectors
368 );
369 }
370}