1mod data;
19mod filter;
20
21use crate::arrow::ProjectionMask;
22use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache};
23use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
24use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
25use crate::arrow::arrow_reader::{
26 ParquetRecordBatchReader, PredicateOptions, ReadPlanBuilder, RowFilter, RowSelection,
27 RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, RwLock};
43
44#[derive(Debug)]
46struct RowGroupInfo {
47 row_group_idx: usize,
48 row_count: usize,
49 plan_builder: ReadPlanBuilder,
50 budget: RowBudget,
51}
52
53#[derive(Debug)]
55enum RowGroupDecoderState {
56 Start {
57 row_group_info: RowGroupInfo,
58 },
59 Filters {
61 row_group_info: RowGroupInfo,
62 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
64 filter_info: FilterInfo,
65 },
66 WaitingOnFilterData {
68 row_group_info: RowGroupInfo,
69 filter_info: FilterInfo,
70 data_request: DataRequest,
71 },
72 StartData {
74 row_group_info: RowGroupInfo,
75 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
77 cache_info: Option<CacheInfo>,
79 },
80 WaitingOnData {
82 row_group_info: RowGroupInfo,
83 data_request: DataRequest,
84 cache_info: Option<CacheInfo>,
86 },
87 Finished,
89}
90
91#[derive(Debug, Clone, Copy, Eq, PartialEq)]
93pub(crate) struct RowBudget {
94 offset: Option<usize>,
95 limit: Option<usize>,
96}
97
98impl RowBudget {
99 pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
100 Self { offset, limit }
101 }
102
103 pub(crate) fn is_exhausted(self) -> bool {
104 matches!(self.limit, Some(0))
105 }
106
107 pub(crate) fn offset(self) -> Option<usize> {
109 self.offset
110 }
111
112 pub(crate) fn limit(self) -> Option<usize> {
114 self.limit
115 }
116
117 pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
119 let rows_after_offset = rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
120 match self.limit {
121 Some(limit) => rows_after_offset.min(limit),
122 None => rows_after_offset,
123 }
124 }
125
126 fn selected_row_limit(self) -> Option<usize> {
128 self.limit
129 .map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
130 }
131
132 fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> BudgetedReadPlan {
133 let rows_before_budget = plan_builder.num_rows_selected().unwrap_or(row_count);
134 let plan_builder = plan_builder
135 .limited(row_count)
136 .with_offset(self.offset)
137 .with_limit(self.limit)
138 .build_limited();
139 let rows_after_budget = self.rows_after(rows_before_budget);
140
141 BudgetedReadPlan {
142 plan_builder,
143 rows_before_budget,
144 rows_after_budget,
145 remaining_budget: self.advance(rows_before_budget, rows_after_budget),
146 }
147 }
148
149 pub(crate) fn advance(mut self, rows_before_budget: usize, rows_after_budget: usize) -> Self {
155 if let Some(offset) = &mut self.offset {
156 *offset = offset.saturating_sub(rows_before_budget - rows_after_budget);
159 }
160
161 if rows_after_budget != 0 {
162 if let Some(limit) = &mut self.limit {
163 *limit -= rows_after_budget;
164 }
165 }
166
167 self
168 }
169}
170
171#[derive(Debug)]
172struct BudgetedReadPlan {
173 plan_builder: ReadPlanBuilder,
175 rows_before_budget: usize,
178 rows_after_budget: usize,
181 remaining_budget: RowBudget,
183}
184
185#[derive(Debug)]
186pub(crate) enum RowGroupBuildResult {
187 Finished {
189 remaining_budget: RowBudget,
191 },
192 NeedsData(Vec<Range<u64>>),
194 Data {
196 batch_reader: ParquetRecordBatchReader,
197 remaining_budget: RowBudget,
199 },
200}
201
202#[derive(Debug)]
204struct NextState {
205 next_state: RowGroupDecoderState,
206 result: Option<RowGroupBuildResult>,
211}
212
213impl NextState {
214 fn again(next_state: RowGroupDecoderState) -> Self {
218 Self {
219 next_state,
220 result: None,
221 }
222 }
223
224 fn result(next_state: RowGroupDecoderState, result: RowGroupBuildResult) -> Self {
226 Self {
227 next_state,
228 result: Some(result),
229 }
230 }
231}
232
233#[derive(Debug)]
239pub(crate) struct RowGroupReaderBuilder {
240 batch_size: usize,
242
243 projection: ProjectionMask,
245
246 metadata: Arc<ParquetMetaData>,
248
249 fields: Option<Arc<ParquetField>>,
251
252 filter: Option<RowFilter>,
254
255 max_predicate_cache_size: usize,
259
260 metrics: ArrowReaderMetrics,
262
263 row_selection_policy: RowSelectionPolicy,
265
266 state: Option<RowGroupDecoderState>,
271
272 buffers: PushBuffers,
274}
275
276#[derive(Debug)]
282pub(crate) struct RowGroupReaderBuilderParts {
283 pub batch_size: usize,
284 pub projection: ProjectionMask,
285 pub fields: Option<Arc<ParquetField>>,
286 pub filter: Option<RowFilter>,
287 pub max_predicate_cache_size: usize,
288 pub metrics: ArrowReaderMetrics,
289 pub row_selection_policy: RowSelectionPolicy,
290 pub buffers: PushBuffers,
293}
294
295impl RowGroupReaderBuilder {
296 #[expect(clippy::too_many_arguments)]
298 pub(crate) fn new(
299 batch_size: usize,
300 projection: ProjectionMask,
301 metadata: Arc<ParquetMetaData>,
302 fields: Option<Arc<ParquetField>>,
303 filter: Option<RowFilter>,
304 metrics: ArrowReaderMetrics,
305 max_predicate_cache_size: usize,
306 buffers: PushBuffers,
307 row_selection_policy: RowSelectionPolicy,
308 ) -> Self {
309 Self {
310 batch_size,
311 projection,
312 metadata,
313 fields,
314 filter,
315 metrics,
316 max_predicate_cache_size,
317 row_selection_policy,
318 state: Some(RowGroupDecoderState::Finished),
319 buffers,
320 }
321 }
322
323 pub(crate) fn into_parts(self) -> RowGroupReaderBuilderParts {
327 let Self {
330 batch_size,
331 projection,
332 metadata: _,
333 fields,
334 filter,
335 max_predicate_cache_size,
336 metrics,
337 row_selection_policy,
338 state: _,
339 buffers,
340 } = self;
341 RowGroupReaderBuilderParts {
342 batch_size,
343 projection,
344 fields,
345 filter,
346 max_predicate_cache_size,
347 metrics,
348 row_selection_policy,
349 buffers,
350 }
351 }
352
353 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
355 self.buffers.push_ranges(ranges, buffers);
356 }
357
358 pub(crate) fn is_finished(&self) -> bool {
363 matches!(self.state, Some(RowGroupDecoderState::Finished))
364 }
365
366 pub fn buffered_bytes(&self) -> u64 {
368 self.buffers.buffered_bytes()
369 }
370
371 pub fn clear_all_ranges(&mut self) {
373 self.buffers.clear_all_ranges();
374 }
375
376 fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
384 self.state.take().ok_or_else(|| {
385 ParquetError::General(String::from(
386 "Internal Error: RowGroupReader in invalid state",
387 ))
388 })
389 }
390
391 pub(crate) fn has_active_row_group(&self) -> bool {
393 !matches!(self.state, Some(RowGroupDecoderState::Finished))
394 }
395
396 pub(crate) fn next_row_group(
398 &mut self,
399 row_group_idx: usize,
400 row_count: usize,
401 selection: Option<RowSelection>,
402 budget: RowBudget,
403 ) -> Result<(), ParquetError> {
404 let state = self.take_state()?;
405 if !matches!(state, RowGroupDecoderState::Finished) {
406 return Err(ParquetError::General(format!(
407 "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
408 )));
409 }
410 let plan_builder = ReadPlanBuilder::new(self.batch_size)
411 .with_selection(selection)
412 .with_row_selection_policy(self.row_selection_policy);
413
414 let row_group_info = RowGroupInfo {
415 row_group_idx,
416 row_count,
417 plan_builder,
418 budget,
419 };
420
421 self.state = Some(RowGroupDecoderState::Start { row_group_info });
422 Ok(())
423 }
424
425 pub(crate) fn try_build(&mut self) -> Result<RowGroupBuildResult, ParquetError> {
432 loop {
433 let current_state = self.take_state()?;
434 match self.try_transition(current_state)? {
436 NextState {
438 next_state,
439 result: Some(result),
440 } => {
441 self.state = Some(next_state);
443 return Ok(result);
444 }
445 NextState {
447 next_state,
448 result: None,
449 } => {
450 self.state = Some(next_state);
452 }
453 }
454 }
455 }
456
457 fn try_transition(
467 &mut self,
468 current_state: RowGroupDecoderState,
469 ) -> Result<NextState, ParquetError> {
470 let result = match current_state {
471 RowGroupDecoderState::Start { row_group_info } => {
472 debug_assert!(
473 !row_group_info.budget.is_exhausted(),
474 "RowGroupFrontier should not hand off row groups after the output limit is exhausted"
475 );
476
477 let column_chunks = None; let Some(filter) = self.filter.take() else {
480 return Ok(NextState::again(RowGroupDecoderState::StartData {
482 row_group_info,
483 column_chunks,
484 cache_info: None,
485 }));
486 };
487 if filter.predicates.is_empty() {
489 return Ok(NextState::again(RowGroupDecoderState::StartData {
490 row_group_info,
491 column_chunks,
492 cache_info: None,
493 }));
494 };
495
496 let cache_projection =
498 self.compute_cache_projection(row_group_info.row_group_idx, &filter);
499
500 let cache_info = CacheInfo::new(
501 cache_projection,
502 Arc::new(RwLock::new(RowGroupCache::new(
503 self.batch_size,
504 self.max_predicate_cache_size,
505 ))),
506 );
507
508 let filter_info = FilterInfo::new(filter, cache_info);
509 NextState::again(RowGroupDecoderState::Filters {
510 row_group_info,
511 filter_info,
512 column_chunks,
513 })
514 }
515 RowGroupDecoderState::Filters {
517 row_group_info,
518 column_chunks,
519 filter_info,
520 } => {
521 let RowGroupInfo {
522 row_group_idx,
523 row_count,
524 plan_builder,
525 budget,
526 } = row_group_info;
527
528 if !plan_builder.selects_any() {
530 self.filter = Some(filter_info.into_filter());
532 return Ok(NextState::result(
533 RowGroupDecoderState::Finished,
534 RowGroupBuildResult::Finished {
535 remaining_budget: budget,
536 },
537 ));
538 }
539
540 let predicate = filter_info.current();
542
543 let data_request = DataRequestBuilder::new(
546 row_group_idx,
547 row_count,
548 self.batch_size,
549 &self.metadata,
550 predicate.projection(), )
552 .with_selection(plan_builder.selection())
553 .with_cache_projection(Some(filter_info.cache_projection()))
555 .with_column_chunks(column_chunks)
556 .build();
557
558 let row_group_info = RowGroupInfo {
559 row_group_idx,
560 row_count,
561 plan_builder,
562 budget,
563 };
564
565 NextState::again(RowGroupDecoderState::WaitingOnFilterData {
566 row_group_info,
567 filter_info,
568 data_request,
569 })
570 }
571 RowGroupDecoderState::WaitingOnFilterData {
572 row_group_info,
573 data_request,
574 mut filter_info,
575 } => {
576 let needed_ranges = data_request.needed_ranges(&self.buffers);
578 if !needed_ranges.is_empty() {
579 return Ok(NextState::result(
581 RowGroupDecoderState::WaitingOnFilterData {
582 row_group_info,
583 filter_info,
584 data_request,
585 },
586 RowGroupBuildResult::NeedsData(needed_ranges),
587 ));
588 }
589
590 let RowGroupInfo {
592 row_group_idx,
593 row_count,
594 mut plan_builder,
595 budget,
596 } = row_group_info;
597
598 let predicate = filter_info.current();
599
600 let row_group = data_request.try_into_in_memory_row_group(
601 row_group_idx,
602 row_count,
603 &self.metadata,
604 predicate.projection(),
605 &mut self.buffers,
606 )?;
607
608 let cache_options = filter_info.cache_builder().producer();
609
610 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
611 .with_batch_size(self.batch_size)
612 .with_cache_options(Some(&cache_options))
613 .with_parquet_metadata(&self.metadata)
614 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
615
616 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
622
623 plan_builder = override_selector_strategy_if_needed(
627 plan_builder,
628 predicate.projection(),
629 self.row_group_offset_index(row_group_idx),
630 );
631
632 let predicate_limit = filter_info
636 .is_last()
637 .then(|| budget.selected_row_limit())
638 .flatten();
639
640 let mut predicate_options =
644 PredicateOptions::new(array_reader, filter_info.current_mut());
645 if let Some(limit) = predicate_limit {
646 predicate_options = predicate_options.with_limit(limit, row_count);
647 }
648 plan_builder = plan_builder.with_predicate_options(predicate_options)?;
649
650 let row_group_info = RowGroupInfo {
651 row_group_idx,
652 row_count,
653 plan_builder,
654 budget,
655 };
656
657 let column_chunks = Some(row_group.column_chunks);
659
660 match filter_info.advance() {
662 AdvanceResult::Continue(filter_info) => {
663 NextState::again(RowGroupDecoderState::Filters {
664 row_group_info,
665 column_chunks,
666 filter_info,
667 })
668 }
669 AdvanceResult::Done(filter, cache_info) => {
671 assert!(self.filter.is_none());
673 self.filter = Some(filter);
674 NextState::again(RowGroupDecoderState::StartData {
675 row_group_info,
676 column_chunks,
677 cache_info: Some(cache_info),
678 })
679 }
680 }
681 }
682 RowGroupDecoderState::StartData {
683 row_group_info,
684 column_chunks,
685 cache_info,
686 } => {
687 let RowGroupInfo {
688 row_group_idx,
689 row_count,
690 plan_builder,
691 budget,
692 } = row_group_info;
693
694 let BudgetedReadPlan {
695 mut plan_builder,
696 rows_before_budget,
697 rows_after_budget,
698 remaining_budget,
699 } = budget.apply_to_plan(plan_builder, row_count);
700
701 if rows_before_budget == 0 {
702 return Ok(NextState::result(
704 RowGroupDecoderState::Finished,
705 RowGroupBuildResult::Finished { remaining_budget },
706 ));
707 }
708
709 if rows_after_budget == 0 {
710 return Ok(NextState::result(
712 RowGroupDecoderState::Finished,
713 RowGroupBuildResult::Finished { remaining_budget },
714 ));
715 }
716
717 let data_request = DataRequestBuilder::new(
718 row_group_idx,
719 row_count,
720 self.batch_size,
721 &self.metadata,
722 &self.projection,
723 )
724 .with_selection(plan_builder.selection())
725 .with_column_chunks(column_chunks)
726 .build();
729
730 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
731
732 plan_builder = override_selector_strategy_if_needed(
733 plan_builder,
734 &self.projection,
735 self.row_group_offset_index(row_group_idx),
736 );
737
738 let row_group_info = RowGroupInfo {
739 row_group_idx,
740 row_count,
741 plan_builder,
742 budget: remaining_budget,
743 };
744
745 NextState::again(RowGroupDecoderState::WaitingOnData {
746 row_group_info,
747 data_request,
748 cache_info,
749 })
750 }
751 RowGroupDecoderState::WaitingOnData {
753 row_group_info,
754 data_request,
755 cache_info,
756 } => {
757 let needed_ranges = data_request.needed_ranges(&self.buffers);
758 if !needed_ranges.is_empty() {
759 return Ok(NextState::result(
761 RowGroupDecoderState::WaitingOnData {
762 row_group_info,
763 data_request,
764 cache_info,
765 },
766 RowGroupBuildResult::NeedsData(needed_ranges),
767 ));
768 }
769
770 let RowGroupInfo {
772 row_group_idx,
773 row_count,
774 plan_builder,
775 budget,
776 } = row_group_info;
777
778 let row_group = data_request.try_into_in_memory_row_group(
779 row_group_idx,
780 row_count,
781 &self.metadata,
782 &self.projection,
783 &mut self.buffers,
784 )?;
785
786 let plan = plan_builder.build();
787
788 let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
790 .with_batch_size(self.batch_size)
791 .with_parquet_metadata(&self.metadata);
792 let array_reader = if let Some(cache_info) = cache_info.as_ref() {
793 let cache_options: CacheOptions = cache_info.builder().consumer();
794 array_reader_builder
795 .with_cache_options(Some(&cache_options))
796 .build_array_reader(self.fields.as_deref(), &self.projection)
797 } else {
798 array_reader_builder
799 .build_array_reader(self.fields.as_deref(), &self.projection)
800 }?;
801
802 let reader = ParquetRecordBatchReader::new(array_reader, plan);
803 NextState::result(
804 RowGroupDecoderState::Finished,
805 RowGroupBuildResult::Data {
806 batch_reader: reader,
807 remaining_budget: budget,
808 },
809 )
810 }
811 RowGroupDecoderState::Finished => {
812 return Err(ParquetError::General(String::from(
813 "Internal Error: try_build called without an active row group",
814 )));
815 }
816 };
817 Ok(result)
818 }
819
820 fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
825 let meta = self.metadata.row_group(row_group_idx);
826 match self.compute_cache_projection_inner(filter) {
827 Some(projection) => projection,
828 None => ProjectionMask::none(meta.columns().len()),
829 }
830 }
831
832 fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
833 if self.max_predicate_cache_size == 0 {
835 return None;
836 }
837 let mut cache_projection = filter.predicates.first()?.projection().clone();
838 for predicate in filter.predicates.iter() {
839 cache_projection.union(predicate.projection());
840 }
841 cache_projection.intersect(&self.projection);
842 self.exclude_nested_columns_from_cache(&cache_projection)
843 }
844
845 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
847 mask.without_nested_types(self.metadata.file_metadata().schema_descr())
848 }
849
850 fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
852 self.metadata
853 .offset_index()
854 .filter(|index| !index.is_empty())
855 .and_then(|index| index.get(row_group_idx))
856 .map(|columns| columns.as_slice())
857 }
858}
859
860fn override_selector_strategy_if_needed(
880 plan_builder: ReadPlanBuilder,
881 projection_mask: &ProjectionMask,
882 offset_index: Option<&[OffsetIndexMetaData]>,
883) -> ReadPlanBuilder {
884 let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
886 return plan_builder;
887 };
888
889 let preferred_strategy = plan_builder.resolve_selection_strategy();
890
891 let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
892 && plan_builder.selection().is_some_and(|selection| {
893 selection.should_force_selectors(projection_mask, offset_index)
894 });
895
896 let resolved_strategy = if force_selectors {
897 RowSelectionStrategy::Selectors
898 } else {
899 preferred_strategy
900 };
901
902 let new_policy = match resolved_strategy {
904 RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
905 RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
906 };
907
908 plan_builder.with_row_selection_policy(new_policy)
909}
910
911#[cfg(test)]
912mod tests {
913 use super::*;
914 use crate::arrow::arrow_reader::{RowSelection, RowSelector};
915
916 #[test]
917 fn test_structure_size() {
919 assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 232);
920 }
921
922 #[test]
923 fn test_row_budget_offset_limit_across_row_groups() {
924 let first =
925 RowBudget::new(Some(225), Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
926 assert_eq!(first.rows_before_budget, 200);
927 assert_eq!(first.rows_after_budget, 0);
928 assert_eq!(first.remaining_budget, RowBudget::new(Some(25), Some(20)));
929 assert_eq!(first.plan_builder.num_rows_selected(), Some(0));
930
931 let second = first
932 .remaining_budget
933 .apply_to_plan(ReadPlanBuilder::new(1024), 200);
934 assert_eq!(second.rows_before_budget, 200);
935 assert_eq!(second.rows_after_budget, 20);
936 assert_eq!(second.remaining_budget, RowBudget::new(Some(0), Some(0)));
937 assert_eq!(second.plan_builder.num_rows_selected(), Some(20));
938 }
939
940 #[test]
941 fn test_row_budget_limit_only() {
942 let budgeted =
943 RowBudget::new(None, Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
944 assert_eq!(budgeted.rows_before_budget, 200);
945 assert_eq!(budgeted.rows_after_budget, 20);
946 assert_eq!(budgeted.remaining_budget, RowBudget::new(None, Some(0)));
947 assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(20));
948 }
949
950 #[test]
951 fn test_row_budget_empty_selection() {
952 let empty_selection = RowSelection::from(vec![RowSelector::skip(200)]);
953 let budgeted = RowBudget::new(Some(10), Some(20)).apply_to_plan(
954 ReadPlanBuilder::new(1024).with_selection(Some(empty_selection)),
955 200,
956 );
957 assert_eq!(budgeted.rows_before_budget, 0);
958 assert_eq!(budgeted.rows_after_budget, 0);
959 assert_eq!(
960 budgeted.remaining_budget,
961 RowBudget::new(Some(10), Some(20))
962 );
963 assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(0));
964 }
965}