1mod data;
19mod filter;
20
21use crate::arrow::ProjectionMask;
22use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache};
23use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
24use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
25use crate::arrow::arrow_reader::{
26 ParquetRecordBatchReader, PredicateOptions, ReadPlanBuilder, RowFilter, RowSelection,
27 RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, RwLock};
43
44#[derive(Debug)]
46struct RowGroupInfo {
47 row_group_idx: usize,
48 row_count: usize,
49 plan_builder: ReadPlanBuilder,
50 budget: RowBudget,
51}
52
53#[derive(Debug)]
55enum RowGroupDecoderState {
56 Start {
57 row_group_info: RowGroupInfo,
58 },
59 Filters {
61 row_group_info: RowGroupInfo,
62 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
64 filter_info: FilterInfo,
65 },
66 WaitingOnFilterData {
68 row_group_info: RowGroupInfo,
69 filter_info: FilterInfo,
70 data_request: DataRequest,
71 },
72 StartData {
74 row_group_info: RowGroupInfo,
75 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
77 cache_info: Option<CacheInfo>,
79 },
80 WaitingOnData {
82 row_group_info: RowGroupInfo,
83 data_request: DataRequest,
84 cache_info: Option<CacheInfo>,
86 },
87 Finished,
89}
90
91#[derive(Debug, Clone, Copy, Eq, PartialEq)]
93pub(crate) struct RowBudget {
94 offset: Option<usize>,
95 limit: Option<usize>,
96}
97
98impl RowBudget {
99 pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
100 Self { offset, limit }
101 }
102
103 pub(crate) fn is_exhausted(self) -> bool {
104 matches!(self.limit, Some(0))
105 }
106
107 pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
109 let rows_after_offset = rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
110 match self.limit {
111 Some(limit) => rows_after_offset.min(limit),
112 None => rows_after_offset,
113 }
114 }
115
116 fn selected_row_limit(self) -> Option<usize> {
118 self.limit
119 .map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
120 }
121
122 fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> BudgetedReadPlan {
123 let rows_before_budget = plan_builder.num_rows_selected().unwrap_or(row_count);
124 let plan_builder = plan_builder
125 .limited(row_count)
126 .with_offset(self.offset)
127 .with_limit(self.limit)
128 .build_limited();
129 let rows_after_budget = self.rows_after(rows_before_budget);
130
131 BudgetedReadPlan {
132 plan_builder,
133 rows_before_budget,
134 rows_after_budget,
135 remaining_budget: self.advance(rows_before_budget, rows_after_budget),
136 }
137 }
138
139 pub(crate) fn advance(mut self, rows_before_budget: usize, rows_after_budget: usize) -> Self {
145 if let Some(offset) = &mut self.offset {
146 *offset = offset.saturating_sub(rows_before_budget - rows_after_budget);
149 }
150
151 if rows_after_budget != 0 {
152 if let Some(limit) = &mut self.limit {
153 *limit -= rows_after_budget;
154 }
155 }
156
157 self
158 }
159}
160
161#[derive(Debug)]
162struct BudgetedReadPlan {
163 plan_builder: ReadPlanBuilder,
165 rows_before_budget: usize,
168 rows_after_budget: usize,
171 remaining_budget: RowBudget,
173}
174
175#[derive(Debug)]
176pub(crate) enum RowGroupBuildResult {
177 Finished {
179 remaining_budget: RowBudget,
181 },
182 NeedsData(Vec<Range<u64>>),
184 Data {
186 batch_reader: ParquetRecordBatchReader,
187 remaining_budget: RowBudget,
189 },
190}
191
192#[derive(Debug)]
194struct NextState {
195 next_state: RowGroupDecoderState,
196 result: Option<RowGroupBuildResult>,
201}
202
203impl NextState {
204 fn again(next_state: RowGroupDecoderState) -> Self {
208 Self {
209 next_state,
210 result: None,
211 }
212 }
213
214 fn result(next_state: RowGroupDecoderState, result: RowGroupBuildResult) -> Self {
216 Self {
217 next_state,
218 result: Some(result),
219 }
220 }
221}
222
223#[derive(Debug)]
229pub(crate) struct RowGroupReaderBuilder {
230 batch_size: usize,
232
233 projection: ProjectionMask,
235
236 metadata: Arc<ParquetMetaData>,
238
239 fields: Option<Arc<ParquetField>>,
241
242 filter: Option<RowFilter>,
244
245 max_predicate_cache_size: usize,
249
250 metrics: ArrowReaderMetrics,
252
253 row_selection_policy: RowSelectionPolicy,
255
256 state: Option<RowGroupDecoderState>,
261
262 buffers: PushBuffers,
264}
265
266impl RowGroupReaderBuilder {
267 #[expect(clippy::too_many_arguments)]
269 pub(crate) fn new(
270 batch_size: usize,
271 projection: ProjectionMask,
272 metadata: Arc<ParquetMetaData>,
273 fields: Option<Arc<ParquetField>>,
274 filter: Option<RowFilter>,
275 metrics: ArrowReaderMetrics,
276 max_predicate_cache_size: usize,
277 buffers: PushBuffers,
278 row_selection_policy: RowSelectionPolicy,
279 ) -> Self {
280 Self {
281 batch_size,
282 projection,
283 metadata,
284 fields,
285 filter,
286 metrics,
287 max_predicate_cache_size,
288 row_selection_policy,
289 state: Some(RowGroupDecoderState::Finished),
290 buffers,
291 }
292 }
293
294 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
296 self.buffers.push_ranges(ranges, buffers);
297 }
298
299 pub fn buffered_bytes(&self) -> u64 {
301 self.buffers.buffered_bytes()
302 }
303
304 pub fn clear_all_ranges(&mut self) {
306 self.buffers.clear_all_ranges();
307 }
308
309 fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
317 self.state.take().ok_or_else(|| {
318 ParquetError::General(String::from(
319 "Internal Error: RowGroupReader in invalid state",
320 ))
321 })
322 }
323
324 pub(crate) fn has_active_row_group(&self) -> bool {
326 !matches!(self.state, Some(RowGroupDecoderState::Finished))
327 }
328
329 pub(crate) fn next_row_group(
331 &mut self,
332 row_group_idx: usize,
333 row_count: usize,
334 selection: Option<RowSelection>,
335 budget: RowBudget,
336 ) -> Result<(), ParquetError> {
337 let state = self.take_state()?;
338 if !matches!(state, RowGroupDecoderState::Finished) {
339 return Err(ParquetError::General(format!(
340 "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
341 )));
342 }
343 let plan_builder = ReadPlanBuilder::new(self.batch_size)
344 .with_selection(selection)
345 .with_row_selection_policy(self.row_selection_policy);
346
347 let row_group_info = RowGroupInfo {
348 row_group_idx,
349 row_count,
350 plan_builder,
351 budget,
352 };
353
354 self.state = Some(RowGroupDecoderState::Start { row_group_info });
355 Ok(())
356 }
357
358 pub(crate) fn try_build(&mut self) -> Result<RowGroupBuildResult, ParquetError> {
365 loop {
366 let current_state = self.take_state()?;
367 match self.try_transition(current_state)? {
369 NextState {
371 next_state,
372 result: Some(result),
373 } => {
374 self.state = Some(next_state);
376 return Ok(result);
377 }
378 NextState {
380 next_state,
381 result: None,
382 } => {
383 self.state = Some(next_state);
385 }
386 }
387 }
388 }
389
390 fn try_transition(
400 &mut self,
401 current_state: RowGroupDecoderState,
402 ) -> Result<NextState, ParquetError> {
403 let result = match current_state {
404 RowGroupDecoderState::Start { row_group_info } => {
405 debug_assert!(
406 !row_group_info.budget.is_exhausted(),
407 "RowGroupFrontier should not hand off row groups after the output limit is exhausted"
408 );
409
410 let column_chunks = None; let Some(filter) = self.filter.take() else {
413 return Ok(NextState::again(RowGroupDecoderState::StartData {
415 row_group_info,
416 column_chunks,
417 cache_info: None,
418 }));
419 };
420 if filter.predicates.is_empty() {
422 return Ok(NextState::again(RowGroupDecoderState::StartData {
423 row_group_info,
424 column_chunks,
425 cache_info: None,
426 }));
427 };
428
429 let cache_projection =
431 self.compute_cache_projection(row_group_info.row_group_idx, &filter);
432
433 let cache_info = CacheInfo::new(
434 cache_projection,
435 Arc::new(RwLock::new(RowGroupCache::new(
436 self.batch_size,
437 self.max_predicate_cache_size,
438 ))),
439 );
440
441 let filter_info = FilterInfo::new(filter, cache_info);
442 NextState::again(RowGroupDecoderState::Filters {
443 row_group_info,
444 filter_info,
445 column_chunks,
446 })
447 }
448 RowGroupDecoderState::Filters {
450 row_group_info,
451 column_chunks,
452 filter_info,
453 } => {
454 let RowGroupInfo {
455 row_group_idx,
456 row_count,
457 plan_builder,
458 budget,
459 } = row_group_info;
460
461 if !plan_builder.selects_any() {
463 self.filter = Some(filter_info.into_filter());
465 return Ok(NextState::result(
466 RowGroupDecoderState::Finished,
467 RowGroupBuildResult::Finished {
468 remaining_budget: budget,
469 },
470 ));
471 }
472
473 let predicate = filter_info.current();
475
476 let data_request = DataRequestBuilder::new(
479 row_group_idx,
480 row_count,
481 self.batch_size,
482 &self.metadata,
483 predicate.projection(), )
485 .with_selection(plan_builder.selection())
486 .with_cache_projection(Some(filter_info.cache_projection()))
488 .with_column_chunks(column_chunks)
489 .build();
490
491 let row_group_info = RowGroupInfo {
492 row_group_idx,
493 row_count,
494 plan_builder,
495 budget,
496 };
497
498 NextState::again(RowGroupDecoderState::WaitingOnFilterData {
499 row_group_info,
500 filter_info,
501 data_request,
502 })
503 }
504 RowGroupDecoderState::WaitingOnFilterData {
505 row_group_info,
506 data_request,
507 mut filter_info,
508 } => {
509 let needed_ranges = data_request.needed_ranges(&self.buffers);
511 if !needed_ranges.is_empty() {
512 return Ok(NextState::result(
514 RowGroupDecoderState::WaitingOnFilterData {
515 row_group_info,
516 filter_info,
517 data_request,
518 },
519 RowGroupBuildResult::NeedsData(needed_ranges),
520 ));
521 }
522
523 let RowGroupInfo {
525 row_group_idx,
526 row_count,
527 mut plan_builder,
528 budget,
529 } = row_group_info;
530
531 let predicate = filter_info.current();
532
533 let row_group = data_request.try_into_in_memory_row_group(
534 row_group_idx,
535 row_count,
536 &self.metadata,
537 predicate.projection(),
538 &mut self.buffers,
539 )?;
540
541 let cache_options = filter_info.cache_builder().producer();
542
543 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
544 .with_batch_size(self.batch_size)
545 .with_cache_options(Some(&cache_options))
546 .with_parquet_metadata(&self.metadata)
547 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
548
549 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
555
556 plan_builder = override_selector_strategy_if_needed(
560 plan_builder,
561 predicate.projection(),
562 self.row_group_offset_index(row_group_idx),
563 );
564
565 let predicate_limit = filter_info
569 .is_last()
570 .then(|| budget.selected_row_limit())
571 .flatten();
572
573 let mut predicate_options =
577 PredicateOptions::new(array_reader, filter_info.current_mut());
578 if let Some(limit) = predicate_limit {
579 predicate_options = predicate_options.with_limit(limit, row_count);
580 }
581 plan_builder = plan_builder.with_predicate_options(predicate_options)?;
582
583 let row_group_info = RowGroupInfo {
584 row_group_idx,
585 row_count,
586 plan_builder,
587 budget,
588 };
589
590 let column_chunks = Some(row_group.column_chunks);
592
593 match filter_info.advance() {
595 AdvanceResult::Continue(filter_info) => {
596 NextState::again(RowGroupDecoderState::Filters {
597 row_group_info,
598 column_chunks,
599 filter_info,
600 })
601 }
602 AdvanceResult::Done(filter, cache_info) => {
604 assert!(self.filter.is_none());
606 self.filter = Some(filter);
607 NextState::again(RowGroupDecoderState::StartData {
608 row_group_info,
609 column_chunks,
610 cache_info: Some(cache_info),
611 })
612 }
613 }
614 }
615 RowGroupDecoderState::StartData {
616 row_group_info,
617 column_chunks,
618 cache_info,
619 } => {
620 let RowGroupInfo {
621 row_group_idx,
622 row_count,
623 plan_builder,
624 budget,
625 } = row_group_info;
626
627 let BudgetedReadPlan {
628 mut plan_builder,
629 rows_before_budget,
630 rows_after_budget,
631 remaining_budget,
632 } = budget.apply_to_plan(plan_builder, row_count);
633
634 if rows_before_budget == 0 {
635 return Ok(NextState::result(
637 RowGroupDecoderState::Finished,
638 RowGroupBuildResult::Finished { remaining_budget },
639 ));
640 }
641
642 if rows_after_budget == 0 {
643 return Ok(NextState::result(
645 RowGroupDecoderState::Finished,
646 RowGroupBuildResult::Finished { remaining_budget },
647 ));
648 }
649
650 let data_request = DataRequestBuilder::new(
651 row_group_idx,
652 row_count,
653 self.batch_size,
654 &self.metadata,
655 &self.projection,
656 )
657 .with_selection(plan_builder.selection())
658 .with_column_chunks(column_chunks)
659 .build();
662
663 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
664
665 plan_builder = override_selector_strategy_if_needed(
666 plan_builder,
667 &self.projection,
668 self.row_group_offset_index(row_group_idx),
669 );
670
671 let row_group_info = RowGroupInfo {
672 row_group_idx,
673 row_count,
674 plan_builder,
675 budget: remaining_budget,
676 };
677
678 NextState::again(RowGroupDecoderState::WaitingOnData {
679 row_group_info,
680 data_request,
681 cache_info,
682 })
683 }
684 RowGroupDecoderState::WaitingOnData {
686 row_group_info,
687 data_request,
688 cache_info,
689 } => {
690 let needed_ranges = data_request.needed_ranges(&self.buffers);
691 if !needed_ranges.is_empty() {
692 return Ok(NextState::result(
694 RowGroupDecoderState::WaitingOnData {
695 row_group_info,
696 data_request,
697 cache_info,
698 },
699 RowGroupBuildResult::NeedsData(needed_ranges),
700 ));
701 }
702
703 let RowGroupInfo {
705 row_group_idx,
706 row_count,
707 plan_builder,
708 budget,
709 } = row_group_info;
710
711 let row_group = data_request.try_into_in_memory_row_group(
712 row_group_idx,
713 row_count,
714 &self.metadata,
715 &self.projection,
716 &mut self.buffers,
717 )?;
718
719 let plan = plan_builder.build();
720
721 let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
723 .with_batch_size(self.batch_size)
724 .with_parquet_metadata(&self.metadata);
725 let array_reader = if let Some(cache_info) = cache_info.as_ref() {
726 let cache_options: CacheOptions = cache_info.builder().consumer();
727 array_reader_builder
728 .with_cache_options(Some(&cache_options))
729 .build_array_reader(self.fields.as_deref(), &self.projection)
730 } else {
731 array_reader_builder
732 .build_array_reader(self.fields.as_deref(), &self.projection)
733 }?;
734
735 let reader = ParquetRecordBatchReader::new(array_reader, plan);
736 NextState::result(
737 RowGroupDecoderState::Finished,
738 RowGroupBuildResult::Data {
739 batch_reader: reader,
740 remaining_budget: budget,
741 },
742 )
743 }
744 RowGroupDecoderState::Finished => {
745 return Err(ParquetError::General(String::from(
746 "Internal Error: try_build called without an active row group",
747 )));
748 }
749 };
750 Ok(result)
751 }
752
753 fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
758 let meta = self.metadata.row_group(row_group_idx);
759 match self.compute_cache_projection_inner(filter) {
760 Some(projection) => projection,
761 None => ProjectionMask::none(meta.columns().len()),
762 }
763 }
764
765 fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
766 if self.max_predicate_cache_size == 0 {
768 return None;
769 }
770 let mut cache_projection = filter.predicates.first()?.projection().clone();
771 for predicate in filter.predicates.iter() {
772 cache_projection.union(predicate.projection());
773 }
774 cache_projection.intersect(&self.projection);
775 self.exclude_nested_columns_from_cache(&cache_projection)
776 }
777
778 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
780 mask.without_nested_types(self.metadata.file_metadata().schema_descr())
781 }
782
783 fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
785 self.metadata
786 .offset_index()
787 .filter(|index| !index.is_empty())
788 .and_then(|index| index.get(row_group_idx))
789 .map(|columns| columns.as_slice())
790 }
791}
792
793fn override_selector_strategy_if_needed(
813 plan_builder: ReadPlanBuilder,
814 projection_mask: &ProjectionMask,
815 offset_index: Option<&[OffsetIndexMetaData]>,
816) -> ReadPlanBuilder {
817 let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
819 return plan_builder;
820 };
821
822 let preferred_strategy = plan_builder.resolve_selection_strategy();
823
824 let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
825 && plan_builder.selection().is_some_and(|selection| {
826 selection.should_force_selectors(projection_mask, offset_index)
827 });
828
829 let resolved_strategy = if force_selectors {
830 RowSelectionStrategy::Selectors
831 } else {
832 preferred_strategy
833 };
834
835 let new_policy = match resolved_strategy {
837 RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
838 RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
839 };
840
841 plan_builder.with_row_selection_policy(new_policy)
842}
843
844#[cfg(test)]
845mod tests {
846 use super::*;
847 use crate::arrow::arrow_reader::{RowSelection, RowSelector};
848
849 #[test]
850 fn test_structure_size() {
852 assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 232);
853 }
854
855 #[test]
856 fn test_row_budget_offset_limit_across_row_groups() {
857 let first =
858 RowBudget::new(Some(225), Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
859 assert_eq!(first.rows_before_budget, 200);
860 assert_eq!(first.rows_after_budget, 0);
861 assert_eq!(first.remaining_budget, RowBudget::new(Some(25), Some(20)));
862 assert_eq!(first.plan_builder.num_rows_selected(), Some(0));
863
864 let second = first
865 .remaining_budget
866 .apply_to_plan(ReadPlanBuilder::new(1024), 200);
867 assert_eq!(second.rows_before_budget, 200);
868 assert_eq!(second.rows_after_budget, 20);
869 assert_eq!(second.remaining_budget, RowBudget::new(Some(0), Some(0)));
870 assert_eq!(second.plan_builder.num_rows_selected(), Some(20));
871 }
872
873 #[test]
874 fn test_row_budget_limit_only() {
875 let budgeted =
876 RowBudget::new(None, Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
877 assert_eq!(budgeted.rows_before_budget, 200);
878 assert_eq!(budgeted.rows_after_budget, 20);
879 assert_eq!(budgeted.remaining_budget, RowBudget::new(None, Some(0)));
880 assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(20));
881 }
882
883 #[test]
884 fn test_row_budget_empty_selection() {
885 let empty_selection = RowSelection::from(vec![RowSelector::skip(200)]);
886 let budgeted = RowBudget::new(Some(10), Some(20)).apply_to_plan(
887 ReadPlanBuilder::new(1024).with_selection(Some(empty_selection)),
888 200,
889 );
890 assert_eq!(budgeted.rows_before_budget, 0);
891 assert_eq!(budgeted.rows_after_budget, 0);
892 assert_eq!(
893 budgeted.remaining_budget,
894 RowBudget::new(Some(10), Some(20))
895 );
896 assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(0));
897 }
898}