parquet/arrow/arrow_reader/
read_plan.rs1use crate::arrow::array_reader::ArrayReader;
22use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
23use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
24use crate::arrow::arrow_reader::{
25 ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector,
26};
27use crate::errors::{ParquetError, Result};
28use arrow_array::{Array, BooleanArray};
29use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
30use arrow_select::filter::prep_null_mask_filter;
31use std::collections::VecDeque;
32
33pub struct PredicateOptions<'a> {
35 array_reader: Box<dyn ArrayReader>,
36 predicate: &'a mut dyn ArrowPredicate,
37 limit: Option<usize>,
38 total_rows: usize,
39}
40
41impl<'a> PredicateOptions<'a> {
42 pub fn new(array_reader: Box<dyn ArrayReader>, predicate: &'a mut dyn ArrowPredicate) -> Self {
49 Self {
50 array_reader,
51 predicate,
52 limit: None,
53 total_rows: 0,
54 }
55 }
56
57 pub fn with_limit(mut self, limit: usize, total_rows: usize) -> Self {
73 self.limit = Some(limit);
74 self.total_rows = total_rows;
75 self
76 }
77}
78
79#[derive(Clone, Debug)]
81pub struct ReadPlanBuilder {
82 batch_size: usize,
83 selection: Option<RowSelection>,
85 row_selection_policy: RowSelectionPolicy,
87}
88
89impl ReadPlanBuilder {
90 pub fn new(batch_size: usize) -> Self {
92 Self {
93 batch_size,
94 selection: None,
95 row_selection_policy: RowSelectionPolicy::default(),
96 }
97 }
98
99 pub fn with_selection(mut self, selection: Option<RowSelection>) -> Self {
101 self.selection = selection;
102 self
103 }
104
105 pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
109 self.row_selection_policy = policy;
110 self
111 }
112
113 pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
115 &self.row_selection_policy
116 }
117
118 pub fn selection(&self) -> Option<&RowSelection> {
120 self.selection.as_ref()
121 }
122
123 pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
131 LimitedReadPlanBuilder::new(self, row_count)
132 }
133
134 pub fn selects_any(&self) -> bool {
136 self.selection
137 .as_ref()
138 .map(|s| s.selects_any())
139 .unwrap_or(true)
140 }
141
142 pub fn num_rows_selected(&self) -> Option<usize> {
144 self.selection.as_ref().map(|s| s.row_count())
145 }
146
147 pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
151 match self.row_selection_policy {
152 RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
153 RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
154 RowSelectionPolicy::Auto { threshold, .. } => {
155 let selection = match self.selection.as_ref() {
156 Some(selection) => selection,
157 None => return RowSelectionStrategy::Selectors,
158 };
159
160 let (total_rows, effective_count) =
163 selection.iter().fold((0usize, 0usize), |(rows, count), s| {
164 if s.row_count > 0 {
165 (rows + s.row_count, count + 1)
166 } else {
167 (rows, count)
168 }
169 });
170
171 if effective_count == 0 {
172 return RowSelectionStrategy::Mask;
173 }
174
175 if total_rows < effective_count.saturating_mul(threshold) {
176 RowSelectionStrategy::Mask
177 } else {
178 RowSelectionStrategy::Selectors
179 }
180 }
181 }
182 }
183
184 pub fn with_predicate(
194 self,
195 array_reader: Box<dyn ArrayReader>,
196 predicate: &mut dyn ArrowPredicate,
197 ) -> Result<Self> {
198 self.with_predicate_options(PredicateOptions::new(array_reader, predicate))
199 }
200
201 pub fn with_predicate_options(mut self, options: PredicateOptions<'_>) -> Result<Self> {
208 let PredicateOptions {
209 array_reader,
210 predicate,
211 limit,
212 total_rows,
213 } = options;
214
215 let expected_rows = match self.selection.as_ref() {
222 Some(s) => Some(s.row_count()),
223 None => limit.map(|_| total_rows),
224 };
225
226 let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
227 let mut filters = vec![];
228 let mut processed_rows: usize = 0;
229 let mut matched_rows: usize = 0;
230 for maybe_batch in reader {
231 let maybe_batch = maybe_batch?;
232 let input_rows = maybe_batch.num_rows();
233 let filter = predicate.evaluate(maybe_batch)?;
234 if filter.len() != input_rows {
236 return Err(arrow_err!(
237 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
238 filter.len()
239 ));
240 }
241 let filter = match filter.null_count() {
242 0 => filter,
243 _ => prep_null_mask_filter(&filter),
244 };
245
246 processed_rows += input_rows;
247
248 match limit {
249 Some(limit) if matched_rows + filter.true_count() >= limit => {
250 let needed = limit - matched_rows;
251 let truncated = truncate_filter_after_n_trues(filter, needed);
252 filters.push(truncated);
253 break;
254 }
255 _ => {
256 matched_rows += filter.true_count();
257 filters.push(filter);
258 }
259 }
260 }
261
262 if let Some(expected) = expected_rows {
268 if processed_rows < expected {
269 let pad_len = expected - processed_rows;
270 filters.push(BooleanArray::new(BooleanBuffer::new_unset(pad_len), None));
271 }
272 }
273
274 let all_selected = filters.iter().all(|f| f.true_count() == f.len());
278 if all_selected && self.selection.is_none() {
279 return Ok(self);
280 }
281 let raw = RowSelection::from_filters(&filters);
282 self.selection = match self.selection.take() {
283 Some(selection) => Some(selection.and_then(&raw)),
284 None => Some(raw),
285 };
286 Ok(self)
287 }
288
289 pub fn build(mut self) -> ReadPlan {
291 if !self.selects_any() {
293 self.selection = Some(RowSelection::from(vec![]));
294 }
295
296 let selection_strategy = self.resolve_selection_strategy();
298
299 let Self {
300 batch_size,
301 selection,
302 row_selection_policy: _,
303 } = self;
304
305 let selection = selection.map(|s| s.trim());
306
307 let row_selection_cursor = selection
308 .map(|s| {
309 let trimmed = s.trim();
310 let selectors: Vec<RowSelector> = trimmed.into();
311 match selection_strategy {
312 RowSelectionStrategy::Mask => {
313 RowSelectionCursor::new_mask_from_selectors(selectors)
314 }
315 RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
316 }
317 })
318 .unwrap_or(RowSelectionCursor::new_all());
319
320 ReadPlan {
321 batch_size,
322 row_selection_cursor,
323 }
324 }
325}
326
327pub(crate) struct LimitedReadPlanBuilder {
331 inner: ReadPlanBuilder,
333 row_count: usize,
336 offset: Option<usize>,
338 limit: Option<usize>,
340}
341
342impl LimitedReadPlanBuilder {
343 fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
345 Self {
346 inner,
347 row_count,
348 offset: None,
349 limit: None,
350 }
351 }
352
353 pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
355 self.offset = offset;
356 self
357 }
358
359 pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
361 self.limit = limit;
362 self
363 }
364
365 pub(crate) fn build_limited(self) -> ReadPlanBuilder {
368 let Self {
369 mut inner,
370 row_count,
371 offset,
372 limit,
373 } = self;
374
375 if !inner.selects_any() {
377 inner.selection = Some(RowSelection::from(vec![]));
378 }
379
380 if let Some(offset) = offset {
382 inner.selection = Some(match row_count.checked_sub(offset) {
383 None => RowSelection::from(vec![]),
384 Some(remaining) => inner
385 .selection
386 .map(|selection| selection.offset(offset))
387 .unwrap_or_else(|| {
388 RowSelection::from(vec![
389 RowSelector::skip(offset),
390 RowSelector::select(remaining),
391 ])
392 }),
393 });
394 }
395
396 if let Some(limit) = limit {
398 inner.selection = Some(
399 inner
400 .selection
401 .map(|selection| selection.limit(limit))
402 .unwrap_or_else(|| {
403 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
404 }),
405 );
406 }
407
408 inner
409 }
410}
411
412fn truncate_filter_after_n_trues(filter: BooleanArray, n: usize) -> BooleanArray {
419 if filter.true_count() <= n {
420 return filter;
421 }
422 let len = filter.len();
423 if n == 0 {
424 return BooleanArray::new(BooleanBuffer::new_unset(len), None);
425 }
426 let values = filter.values();
430 let last_kept = values
431 .set_indices()
432 .nth(n - 1)
433 .expect("n - 1 < true_count, checked above");
434
435 let mut builder = BooleanBufferBuilder::new(len);
436 builder.append_buffer(&values.slice(0, last_kept + 1));
437 builder.append_n(len - last_kept - 1, false);
438 BooleanArray::new(builder.finish(), None)
439}
440
441#[derive(Debug)]
445pub struct ReadPlan {
446 batch_size: usize,
448 row_selection_cursor: RowSelectionCursor,
450}
451
452impl ReadPlan {
453 #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
455 pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
456 if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
457 Some(selectors_cursor.selectors_mut())
458 } else {
459 None
460 }
461 }
462
463 pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
465 &mut self.row_selection_cursor
466 }
467
468 #[inline(always)]
470 pub fn batch_size(&self) -> usize {
471 self.batch_size
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478
479 fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
480 ReadPlanBuilder::new(1024).with_selection(Some(selection))
481 }
482
483 #[test]
484 fn preferred_selection_strategy_prefers_mask_by_default() {
485 let selection = RowSelection::from(vec![RowSelector::select(8)]);
486 let builder = builder_with_selection(selection);
487 assert_eq!(
488 builder.resolve_selection_strategy(),
489 RowSelectionStrategy::Mask
490 );
491 }
492
493 #[test]
494 fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
495 let selection = RowSelection::from(vec![RowSelector::select(8)]);
496 let builder = builder_with_selection(selection)
497 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1 });
498 assert_eq!(
499 builder.resolve_selection_strategy(),
500 RowSelectionStrategy::Selectors
501 );
502 }
503
504 #[test]
505 fn truncate_filter_after_n_trues_keeps_first_n_matches() {
506 let f = BooleanArray::from(vec![true, false, true, true, false, true, true]);
507 let t = truncate_filter_after_n_trues(f.clone(), 3);
509 assert_eq!(t.len(), f.len());
510 assert_eq!(t.true_count(), 3);
511 let out: Vec<bool> = (0..t.len()).map(|i| t.value(i)).collect();
512 assert_eq!(
513 out,
514 vec![true, false, true, true, false, false, false],
515 "first three trues should survive, the rest become false"
516 );
517 }
518
519 #[test]
520 fn truncate_filter_after_n_trues_passes_through_when_already_small_enough() {
521 let f = BooleanArray::from(vec![true, false, true, false]);
522 let t = truncate_filter_after_n_trues(f.clone(), 5);
523 assert_eq!(t.len(), f.len());
524 assert_eq!(t.true_count(), 2);
525 }
526
527 #[test]
528 fn truncate_filter_after_n_trues_zero_returns_all_false() {
529 let f = BooleanArray::from(vec![true, true, true]);
530 let t = truncate_filter_after_n_trues(f, 0);
531 assert_eq!(t.len(), 3);
532 assert_eq!(t.true_count(), 0);
533 }
534
535 #[test]
536 fn with_predicate_options_limit_pads_tail_when_no_prior_selection() {
537 use crate::arrow::ProjectionMask;
538 use crate::arrow::array_reader::StructArrayReader;
539 use crate::arrow::array_reader::test_util::InMemoryArrayReader;
540 use crate::arrow::arrow_reader::ArrowPredicateFn;
541 use arrow_array::Int32Array;
542 use arrow_schema::{DataType as ArrowType, Field, Fields};
543 use std::sync::Arc;
544
545 const TOTAL_ROWS: usize = 100;
550 const LIMIT: usize = 10;
551
552 let data: Vec<i32> = (0..TOTAL_ROWS as i32).collect();
553 let array = Arc::new(Int32Array::from(data));
554 let leaf = InMemoryArrayReader::new(ArrowType::Int32, array.clone(), None, None);
555 let struct_type = ArrowType::Struct(Fields::from(vec![Field::new(
556 "c0",
557 ArrowType::Int32,
558 false,
559 )]));
560 let struct_reader = StructArrayReader::new(struct_type, vec![Box::new(leaf)], 0, 0, false);
561
562 let mut predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
563 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
564 });
565
566 let builder = ReadPlanBuilder::new(16)
567 .with_predicate_options(
568 PredicateOptions::new(Box::new(struct_reader), &mut predicate)
569 .with_limit(LIMIT, TOTAL_ROWS),
570 )
571 .unwrap();
572
573 let selection = builder
574 .selection()
575 .expect("limit-driven early break must produce a selection");
576
577 assert_eq!(selection.row_count(), LIMIT);
579
580 let total: usize = selection.iter().map(|s| s.row_count).sum();
583 assert_eq!(
584 total, TOTAL_ROWS,
585 "selection must span the full row group, not only the prefix evaluated before the limit"
586 );
587 }
588}