parquet/arrow/arrow_reader/
read_plan.rs1use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
23use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
24use crate::arrow::arrow_reader::{
25 ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
26};
27use crate::errors::{ParquetError, Result};
28use arrow_array::Array;
29use arrow_select::filter::prep_null_mask_filter;
30use std::collections::VecDeque;
31
32#[derive(Clone, Debug)]
34pub struct ReadPlanBuilder {
35 batch_size: usize,
36 selection: Option<RowSelection>,
38 row_selection_policy: RowSelectionPolicy,
40}
41
42impl ReadPlanBuilder {
43 pub fn new(batch_size: usize) -> Self {
45 Self {
46 batch_size,
47 selection: None,
48 row_selection_policy: RowSelectionPolicy::default(),
49 }
50 }
51
52 pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
54 self.selection = selection;
55 self
56 }
57
58 pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
62 self.row_selection_policy = policy;
63 self
64 }
65
66 pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
68 &self.row_selection_policy
69 }
70
71 pub fn selection(&self) -> Option<&RowSelection> {
73 self.selection.as_ref()
74 }
75
76 pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
84 LimitedReadPlanBuilder::new(self, row_count)
85 }
86
87 pub fn selects_any(&self) -> bool {
89 self.selection
90 .as_ref()
91 .map(|s| s.selects_any())
92 .unwrap_or(true)
93 }
94
95 pub fn num_rows_selected(&self) -> Option<usize> {
97 self.selection.as_ref().map(|s| s.row_count())
98 }
99
100 pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
104 match self.row_selection_policy {
105 RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
106 RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
107 RowSelectionPolicy::Auto { threshold, .. } => {
108 let selection = match self.selection.as_ref() {
109 Some(selection) => selection,
110 None => return RowSelectionStrategy::Selectors,
111 };
112
113 let (total_rows, effective_count) =
116 selection.iter().fold((0usize, 0usize), |(rows, count), s| {
117 if s.row_count > 0 {
118 (rows + s.row_count, count + 1)
119 } else {
120 (rows, count)
121 }
122 });
123
124 if effective_count == 0 {
125 return RowSelectionStrategy::Mask;
126 }
127
128 if total_rows < effective_count.saturating_mul(threshold) {
129 RowSelectionStrategy::Mask
130 } else {
131 RowSelectionStrategy::Selectors
132 }
133 }
134 }
135 }
136
137 pub fn with_predicate(
147 mut self,
148 array_reader: Box<dyn ArrayReader>,
149 predicate: &mut dyn ArrowPredicate,
150 ) -> Result<Self> {
151 let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
152 let mut filters = vec![];
153 for maybe_batch in reader {
154 let maybe_batch = maybe_batch?;
155 let input_rows = maybe_batch.num_rows();
156 let filter = predicate.evaluate(maybe_batch)?;
157 if filter.len() != input_rows {
159 return Err(arrow_err!(
160 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
161 filter.len()
162 ));
163 }
164 match filter.null_count() {
165 0 => filters.push(filter),
166 _ => filters.push(prep_null_mask_filter(&filter)),
167 };
168 }
169
170 let raw = RowSelection::from_filters(&filters);
171 self.selection = match self.selection.take() {
172 Some(selection) => Some(selection.and_then(&raw)),
173 None => Some(raw),
174 };
175 Ok(self)
176 }
177
178 pub fn build(mut self) -> ReadPlan {
180 if !self.selects_any() {
182 self.selection = Some(RowSelection::from(vec![]));
183 }
184
185 let selection_strategy = self.resolve_selection_strategy();
187
188 let Self {
189 batch_size,
190 selection,
191 row_selection_policy: _,
192 } = self;
193
194 let selection = selection.map(|s| s.trim());
195
196 let row_selection_cursor = selection
197 .map(|s| {
198 let trimmed = s.trim();
199 let selectors: Vec<RowSelector> = trimmed.into();
200 match selection_strategy {
201 RowSelectionStrategy::Mask => {
202 RowSelectionCursor::new_mask_from_selectors(selectors)
203 }
204 RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
205 }
206 })
207 .unwrap_or(RowSelectionCursor::new_all());
208
209 ReadPlan {
210 batch_size,
211 row_selection_cursor,
212 }
213 }
214}
215
216pub(crate) struct LimitedReadPlanBuilder {
220 inner: ReadPlanBuilder,
222 row_count: usize,
225 offset: Option<usize>,
227 limit: Option<usize>,
229}
230
231impl LimitedReadPlanBuilder {
232 fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
234 Self {
235 inner,
236 row_count,
237 offset: None,
238 limit: None,
239 }
240 }
241
242 pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
244 self.offset = offset;
245 self
246 }
247
248 pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
250 self.limit = limit;
251 self
252 }
253
254 pub(crate) fn build_limited(self) -> ReadPlanBuilder {
257 let Self {
258 mut inner,
259 row_count,
260 offset,
261 limit,
262 } = self;
263
264 if !inner.selects_any() {
266 inner.selection = Some(RowSelection::from(vec![]));
267 }
268
269 if let Some(offset) = offset {
271 inner.selection = Some(match row_count.checked_sub(offset) {
272 None => RowSelection::from(vec![]),
273 Some(remaining) => inner
274 .selection
275 .map(|selection| selection.offset(offset))
276 .unwrap_or_else(|| {
277 RowSelection::from(vec![
278 RowSelector::skip(offset),
279 RowSelector::select(remaining),
280 ])
281 }),
282 });
283 }
284
285 if let Some(limit) = limit {
287 inner.selection = Some(
288 inner
289 .selection
290 .map(|selection| selection.limit(limit))
291 .unwrap_or_else(|| {
292 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
293 }),
294 );
295 }
296
297 inner
298 }
299}
300
301#[derive(Debug)]
305pub struct ReadPlan {
306 batch_size: usize,
308 row_selection_cursor: RowSelectionCursor,
310}
311
312impl ReadPlan {
313 #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
315 pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
316 if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
317 Some(selectors_cursor.selectors_mut())
318 } else {
319 None
320 }
321 }
322
323 pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
325 &mut self.row_selection_cursor
326 }
327
328 #[inline(always)]
330 pub fn batch_size(&self) -> usize {
331 self.batch_size
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
340 ReadPlanBuilder::new(1024).with_selection(Some(selection))
341 }
342
343 #[test]
344 fn preferred_selection_strategy_prefers_mask_by_default() {
345 let selection = RowSelection::from(vec![RowSelector::select(8)]);
346 let builder = builder_with_selection(selection);
347 assert_eq!(
348 builder.resolve_selection_strategy(),
349 RowSelectionStrategy::Mask
350 );
351 }
352
353 #[test]
354 fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
355 let selection = RowSelection::from(vec![RowSelector::select(8)]);
356 let builder = builder_with_selection(selection)
357 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
358 assert_eq!(
359 builder.resolve_selection_strategy(),
360 RowSelectionStrategy::Selectors
361 );
362 }
363}