Skip to main content

parquet/arrow/push_decoder/reader_builder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod data;
19mod filter;
20
21use crate::arrow::ProjectionMask;
22use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache};
23use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
24use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
25use crate::arrow::arrow_reader::{
26    ParquetRecordBatchReader, PredicateOptions, ReadPlanBuilder, RowFilter, RowSelection,
27    RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, RwLock};
43
44/// The current row group being read, its read plan, and its offset/limit budget.
45#[derive(Debug)]
46struct RowGroupInfo {
47    row_group_idx: usize,
48    row_count: usize,
49    plan_builder: ReadPlanBuilder,
50    budget: RowBudget,
51}
52
53/// This is the inner state machine for reading a single row group.
54#[derive(Debug)]
55enum RowGroupDecoderState {
56    Start {
57        row_group_info: RowGroupInfo,
58    },
59    /// Planning filters, but haven't yet requested data to evaluate them
60    Filters {
61        row_group_info: RowGroupInfo,
62        /// Any previously read column chunk data from prior filters
63        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
64        filter_info: FilterInfo,
65    },
66    /// Needs data to evaluate current filter
67    WaitingOnFilterData {
68        row_group_info: RowGroupInfo,
69        filter_info: FilterInfo,
70        data_request: DataRequest,
71    },
72    /// Know what data to actually read, after all predicates
73    StartData {
74        row_group_info: RowGroupInfo,
75        /// Any previously read column chunk data from the filtering phase
76        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
77        /// Any cached filter results
78        cache_info: Option<CacheInfo>,
79    },
80    /// Needs data to proceed with reading the output
81    WaitingOnData {
82        row_group_info: RowGroupInfo,
83        data_request: DataRequest,
84        /// Any cached filter results
85        cache_info: Option<CacheInfo>,
86    },
87    /// Finished (or not yet started) reading this group
88    Finished,
89}
90
91/// Running offset/limit budget shared across row groups.
92#[derive(Debug, Clone, Copy, Eq, PartialEq)]
93pub(crate) struct RowBudget {
94    offset: Option<usize>,
95    limit: Option<usize>,
96}
97
98impl RowBudget {
99    pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
100        Self { offset, limit }
101    }
102
103    pub(crate) fn is_exhausted(self) -> bool {
104        matches!(self.limit, Some(0))
105    }
106
107    /// The offset still to be skipped before the next readable row group.
108    pub(crate) fn offset(self) -> Option<usize> {
109        self.offset
110    }
111
112    /// The number of output rows still permitted across the remaining row groups.
113    pub(crate) fn limit(self) -> Option<usize> {
114        self.limit
115    }
116
117    /// Returns how many selected rows remain after applying this budget.
118    pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
119        let rows_after_offset = rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
120        match self.limit {
121            Some(limit) => rows_after_offset.min(limit),
122            None => rows_after_offset,
123        }
124    }
125
126    /// Returns the number of selected rows needed before applying the offset.
127    fn selected_row_limit(self) -> Option<usize> {
128        self.limit
129            .map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
130    }
131
132    fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> BudgetedReadPlan {
133        let rows_before_budget = plan_builder.num_rows_selected().unwrap_or(row_count);
134        let plan_builder = plan_builder
135            .limited(row_count)
136            .with_offset(self.offset)
137            .with_limit(self.limit)
138            .build_limited();
139        let rows_after_budget = self.rows_after(rows_before_budget);
140
141        BudgetedReadPlan {
142            plan_builder,
143            rows_before_budget,
144            rows_after_budget,
145            remaining_budget: self.advance(rows_before_budget, rows_after_budget),
146        }
147    }
148
149    /// Advance the budget past one row group.
150    ///
151    /// `rows_before_budget` is the number of rows selected before applying the
152    /// budget, and `rows_after_budget` is the number retained for output from
153    /// this row group.
154    pub(crate) fn advance(mut self, rows_before_budget: usize, rows_after_budget: usize) -> Self {
155        if let Some(offset) = &mut self.offset {
156            // Reduction is either because of offset or limit, as limit is applied
157            // after offset has been "exhausted" can just use saturating sub here.
158            *offset = offset.saturating_sub(rows_before_budget - rows_after_budget);
159        }
160
161        if rows_after_budget != 0 {
162            if let Some(limit) = &mut self.limit {
163                *limit -= rows_after_budget;
164            }
165        }
166
167        self
168    }
169}
170
171#[derive(Debug)]
172struct BudgetedReadPlan {
173    /// Read plan after applying this row group's share of the offset/limit budget.
174    plan_builder: ReadPlanBuilder,
175    /// Number of rows selected by row selection and predicates before applying
176    /// this row group's offset/limit budget.
177    rows_before_budget: usize,
178    /// Number of selected rows that remain to be read after applying this row
179    /// group's offset/limit budget.
180    rows_after_budget: usize,
181    /// Budget remaining for later row groups.
182    remaining_budget: RowBudget,
183}
184
185#[derive(Debug)]
186pub(crate) enum RowGroupBuildResult {
187    /// The active row group is complete without producing a reader.
188    Finished {
189        /// Budget remaining after applying this row group's selection.
190        remaining_budget: RowBudget,
191    },
192    /// More bytes are needed before the active row group can make progress.
193    NeedsData(Vec<Range<u64>>),
194    /// The active row group produced a reader.
195    Data {
196        batch_reader: ParquetRecordBatchReader,
197        /// Budget remaining after applying this row group's selection.
198        remaining_budget: RowBudget,
199    },
200}
201
202/// Result of a state transition
203#[derive(Debug)]
204struct NextState {
205    next_state: RowGroupDecoderState,
206    /// result to return, if any
207    ///
208    /// * `Some`: the processing should stop and return the result
209    /// * `None`: processing should continue
210    result: Option<RowGroupBuildResult>,
211}
212
213impl NextState {
214    /// The next state with no result.
215    ///
216    /// This indicates processing should continue
217    fn again(next_state: RowGroupDecoderState) -> Self {
218        Self {
219            next_state,
220            result: None,
221        }
222    }
223
224    /// Create a NextState with a result that should be returned
225    fn result(next_state: RowGroupDecoderState, result: RowGroupBuildResult) -> Self {
226        Self {
227            next_state,
228            result: Some(result),
229        }
230    }
231}
232
233/// Builder for [`ParquetRecordBatchReader`] for a single row group
234///
235/// This struct drives the main state machine for decoding each row group -- it
236/// determines what data is needed, and then assembles the
237/// `ParquetRecordBatchReader` when all data is available.
238#[derive(Debug)]
239pub(crate) struct RowGroupReaderBuilder {
240    /// The output batch size
241    batch_size: usize,
242
243    /// What columns to project (produce in each output batch)
244    projection: ProjectionMask,
245
246    /// The Parquet file metadata
247    metadata: Arc<ParquetMetaData>,
248
249    /// Top level parquet schema and arrow schema mapping
250    fields: Option<Arc<ParquetField>>,
251
252    /// Optional filter
253    filter: Option<RowFilter>,
254
255    /// The size in bytes of the predicate cache to use
256    ///
257    /// See [`RowGroupCache`] for details.
258    max_predicate_cache_size: usize,
259
260    /// The metrics collector
261    metrics: ArrowReaderMetrics,
262
263    /// Strategy for materialising row selections
264    row_selection_policy: RowSelectionPolicy,
265
266    /// Current state of the decoder.
267    ///
268    /// It is taken when processing, and must be put back before returning
269    /// it is a bug error if it is not put back after transitioning states.
270    state: Option<RowGroupDecoderState>,
271
272    /// The underlying data store
273    buffers: PushBuffers,
274}
275
276/// The parts of a [`RowGroupReaderBuilder`] needed to rebuild it, recovered by
277/// [`RowGroupReaderBuilder::into_parts`].
278///
279/// `metadata` is not included: it is a whole-file property carried alongside
280/// `schema` in `RemainingRowGroupsParts`.
281#[derive(Debug)]
282pub(crate) struct RowGroupReaderBuilderParts {
283    pub batch_size: usize,
284    pub projection: ProjectionMask,
285    pub fields: Option<Arc<ParquetField>>,
286    pub filter: Option<RowFilter>,
287    pub max_predicate_cache_size: usize,
288    pub metrics: ArrowReaderMetrics,
289    pub row_selection_policy: RowSelectionPolicy,
290    /// Bytes already pushed into the decoder, carried across a rebuild so they
291    /// are not re-requested.
292    pub buffers: PushBuffers,
293}
294
295impl RowGroupReaderBuilder {
296    /// Create a new RowGroupReaderBuilder
297    #[expect(clippy::too_many_arguments)]
298    pub(crate) fn new(
299        batch_size: usize,
300        projection: ProjectionMask,
301        metadata: Arc<ParquetMetaData>,
302        fields: Option<Arc<ParquetField>>,
303        filter: Option<RowFilter>,
304        metrics: ArrowReaderMetrics,
305        max_predicate_cache_size: usize,
306        buffers: PushBuffers,
307        row_selection_policy: RowSelectionPolicy,
308    ) -> Self {
309        Self {
310            batch_size,
311            projection,
312            metadata,
313            fields,
314            filter,
315            metrics,
316            max_predicate_cache_size,
317            row_selection_policy,
318            state: Some(RowGroupDecoderState::Finished),
319            buffers,
320        }
321    }
322
323    /// Decompose into [`RowGroupReaderBuilderParts`] so the builder can be
324    /// reconstructed. The runtime decode `state` is discarded; `metadata` is
325    /// recovered from the frontier instead (see `RemainingRowGroups::into_parts`).
326    pub(crate) fn into_parts(self) -> RowGroupReaderBuilderParts {
327        // If a new field is added to `RowGroupReaderBuilder`, it must be added here and in `RowGroupReaderBuilderParts`,
328        // or at least evaluate how it should be handled in the decomposition and reconstruction of the builder.
329        let Self {
330            batch_size,
331            projection,
332            metadata: _,
333            fields,
334            filter,
335            max_predicate_cache_size,
336            metrics,
337            row_selection_policy,
338            state: _,
339            buffers,
340        } = self;
341        RowGroupReaderBuilderParts {
342            batch_size,
343            projection,
344            fields,
345            filter,
346            max_predicate_cache_size,
347            metrics,
348            row_selection_policy,
349            buffers,
350        }
351    }
352
353    /// Push new data buffers that can be used to satisfy pending requests
354    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
355        self.buffers.push_ranges(ranges, buffers);
356    }
357
358    /// True iff the inner state is `Finished`. This is the only state in
359    /// which it is safe to decompose the builder via [`Self::into_parts`],
360    /// because no `RowGroupInfo`, `FilterInfo`, or in-flight `DataRequest`
361    /// is referencing the row-group-scoped decode state.
362    pub(crate) fn is_finished(&self) -> bool {
363        matches!(self.state, Some(RowGroupDecoderState::Finished))
364    }
365
366    /// Returns the total number of buffered bytes available
367    pub fn buffered_bytes(&self) -> u64 {
368        self.buffers.buffered_bytes()
369    }
370
371    /// Clear any staged ranges currently buffered for future decode work.
372    pub fn clear_all_ranges(&mut self) {
373        self.buffers.clear_all_ranges();
374    }
375
376    /// take the current state, leaving None in its place.
377    ///
378    /// Returns an error if there the state wasn't put back after the previous
379    /// call to [`Self::take_state`].
380    ///
381    /// Any code that calls this method must ensure that the state is put back
382    /// before returning, otherwise the reader will error next time it is called
383    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
384        self.state.take().ok_or_else(|| {
385            ParquetError::General(String::from(
386                "Internal Error: RowGroupReader in invalid state",
387            ))
388        })
389    }
390
391    /// Returns true if this builder is currently decoding a row group.
392    pub(crate) fn has_active_row_group(&self) -> bool {
393        !matches!(self.state, Some(RowGroupDecoderState::Finished))
394    }
395
396    /// Setup this reader to read the next row group
397    pub(crate) fn next_row_group(
398        &mut self,
399        row_group_idx: usize,
400        row_count: usize,
401        selection: Option<RowSelection>,
402        budget: RowBudget,
403    ) -> Result<(), ParquetError> {
404        let state = self.take_state()?;
405        if !matches!(state, RowGroupDecoderState::Finished) {
406            return Err(ParquetError::General(format!(
407                "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
408            )));
409        }
410        let plan_builder = ReadPlanBuilder::new(self.batch_size)
411            .with_selection(selection)
412            .with_row_selection_policy(self.row_selection_policy);
413
414        let row_group_info = RowGroupInfo {
415            row_group_idx,
416            row_count,
417            plan_builder,
418            budget,
419        };
420
421        self.state = Some(RowGroupDecoderState::Start { row_group_info });
422        Ok(())
423    }
424
425    /// Try to build the next `ParquetRecordBatchReader` for the active row group.
426    ///
427    /// Returns [`RowGroupBuildResult::NeedsData`] if more data is needed,
428    /// [`RowGroupBuildResult::Data`] if a reader is ready, or
429    /// [`RowGroupBuildResult::Finished`] if the row group completed without
430    /// producing a reader.
431    pub(crate) fn try_build(&mut self) -> Result<RowGroupBuildResult, ParquetError> {
432        loop {
433            let current_state = self.take_state()?;
434            // Try to transition the decoder.
435            match self.try_transition(current_state)? {
436                // Either produced a batch reader, needed input, or finished
437                NextState {
438                    next_state,
439                    result: Some(result),
440                } => {
441                    // put back the next state
442                    self.state = Some(next_state);
443                    return Ok(result);
444                }
445                // completed one internal state, maybe can proceed further
446                NextState {
447                    next_state,
448                    result: None,
449                } => {
450                    // continue processing
451                    self.state = Some(next_state);
452                }
453            }
454        }
455    }
456
457    /// Current state --> next state + optional output
458    ///
459    /// This is the main state transition function for the row group reader
460    /// and encodes the row group decoding state machine.
461    ///
462    /// # Notes
463    ///
464    /// This structure is used to reduce the indentation level of the main loop
465    /// in try_build
466    fn try_transition(
467        &mut self,
468        current_state: RowGroupDecoderState,
469    ) -> Result<NextState, ParquetError> {
470        let result = match current_state {
471            RowGroupDecoderState::Start { row_group_info } => {
472                debug_assert!(
473                    !row_group_info.budget.is_exhausted(),
474                    "RowGroupFrontier should not hand off row groups after the output limit is exhausted"
475                );
476
477                let column_chunks = None; // no prior column chunks
478
479                let Some(filter) = self.filter.take() else {
480                    // no filter, start trying to read data immediately
481                    return Ok(NextState::again(RowGroupDecoderState::StartData {
482                        row_group_info,
483                        column_chunks,
484                        cache_info: None,
485                    }));
486                };
487                // no predicates in filter, so start reading immediately
488                if filter.predicates.is_empty() {
489                    return Ok(NextState::again(RowGroupDecoderState::StartData {
490                        row_group_info,
491                        column_chunks,
492                        cache_info: None,
493                    }));
494                };
495
496                // we have predicates to evaluate
497                let cache_projection =
498                    self.compute_cache_projection(row_group_info.row_group_idx, &filter);
499
500                let cache_info = CacheInfo::new(
501                    cache_projection,
502                    Arc::new(RwLock::new(RowGroupCache::new(
503                        self.batch_size,
504                        self.max_predicate_cache_size,
505                    ))),
506                );
507
508                let filter_info = FilterInfo::new(filter, cache_info);
509                NextState::again(RowGroupDecoderState::Filters {
510                    row_group_info,
511                    filter_info,
512                    column_chunks,
513                })
514            }
515            // need to evaluate filters
516            RowGroupDecoderState::Filters {
517                row_group_info,
518                column_chunks,
519                filter_info,
520            } => {
521                let RowGroupInfo {
522                    row_group_idx,
523                    row_count,
524                    plan_builder,
525                    budget,
526                } = row_group_info;
527
528                // If nothing is selected, we are done with this row group
529                if !plan_builder.selects_any() {
530                    // ruled out entire row group
531                    self.filter = Some(filter_info.into_filter());
532                    return Ok(NextState::result(
533                        RowGroupDecoderState::Finished,
534                        RowGroupBuildResult::Finished {
535                            remaining_budget: budget,
536                        },
537                    ));
538                }
539
540                // Make a request for the data needed to evaluate the current predicate
541                let predicate = filter_info.current();
542
543                // need to fetch pages the column needs for decoding, figure
544                // that out based on the current selection and projection
545                let data_request = DataRequestBuilder::new(
546                    row_group_idx,
547                    row_count,
548                    self.batch_size,
549                    &self.metadata,
550                    predicate.projection(), // use the predicate's projection
551                )
552                .with_selection(plan_builder.selection())
553                // Fetch predicate columns; expand selection only for cached predicate columns
554                .with_cache_projection(Some(filter_info.cache_projection()))
555                .with_column_chunks(column_chunks)
556                .build();
557
558                let row_group_info = RowGroupInfo {
559                    row_group_idx,
560                    row_count,
561                    plan_builder,
562                    budget,
563                };
564
565                NextState::again(RowGroupDecoderState::WaitingOnFilterData {
566                    row_group_info,
567                    filter_info,
568                    data_request,
569                })
570            }
571            RowGroupDecoderState::WaitingOnFilterData {
572                row_group_info,
573                data_request,
574                mut filter_info,
575            } => {
576                // figure out what ranges we still need
577                let needed_ranges = data_request.needed_ranges(&self.buffers);
578                if !needed_ranges.is_empty() {
579                    // still need data
580                    return Ok(NextState::result(
581                        RowGroupDecoderState::WaitingOnFilterData {
582                            row_group_info,
583                            filter_info,
584                            data_request,
585                        },
586                        RowGroupBuildResult::NeedsData(needed_ranges),
587                    ));
588                }
589
590                // otherwise we have all the data we need to evaluate the predicate
591                let RowGroupInfo {
592                    row_group_idx,
593                    row_count,
594                    mut plan_builder,
595                    budget,
596                } = row_group_info;
597
598                let predicate = filter_info.current();
599
600                let row_group = data_request.try_into_in_memory_row_group(
601                    row_group_idx,
602                    row_count,
603                    &self.metadata,
604                    predicate.projection(),
605                    &mut self.buffers,
606                )?;
607
608                let cache_options = filter_info.cache_builder().producer();
609
610                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
611                    .with_batch_size(self.batch_size)
612                    .with_cache_options(Some(&cache_options))
613                    .with_parquet_metadata(&self.metadata)
614                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
615
616                // Reset to original policy before each predicate so the override
617                // can detect page skipping for THIS predicate's columns.
618                // Without this reset, a prior predicate's override (e.g. Mask)
619                // carries forward and the check returns early, missing unfetched
620                // pages for subsequent predicates.
621                plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
622
623                // Prepare to evaluate the filter.
624                // Note: first update the selection strategy to properly handle any pages
625                // pruned during fetch
626                plan_builder = override_selector_strategy_if_needed(
627                    plan_builder,
628                    predicate.projection(),
629                    self.row_group_offset_index(row_group_idx),
630                );
631
632                // When this is the final predicate in the chain and an output
633                // limit is set, tell the filter evaluation to stop once enough
634                // matching rows have been accumulated.
635                let predicate_limit = filter_info
636                    .is_last()
637                    .then(|| budget.selected_row_limit())
638                    .flatten();
639
640                // Evaluate the filter via `with_predicate_options`, opting into
641                // early termination when this is the final predicate and an
642                // output limit was set.
643                let mut predicate_options =
644                    PredicateOptions::new(array_reader, filter_info.current_mut());
645                if let Some(limit) = predicate_limit {
646                    predicate_options = predicate_options.with_limit(limit, row_count);
647                }
648                plan_builder = plan_builder.with_predicate_options(predicate_options)?;
649
650                let row_group_info = RowGroupInfo {
651                    row_group_idx,
652                    row_count,
653                    plan_builder,
654                    budget,
655                };
656
657                // Take back the column chunks that were read
658                let column_chunks = Some(row_group.column_chunks);
659
660                // advance to the next predicate, if any
661                match filter_info.advance() {
662                    AdvanceResult::Continue(filter_info) => {
663                        NextState::again(RowGroupDecoderState::Filters {
664                            row_group_info,
665                            column_chunks,
666                            filter_info,
667                        })
668                    }
669                    // done with predicates, proceed to reading data
670                    AdvanceResult::Done(filter, cache_info) => {
671                        // remember we need to put back the filter
672                        assert!(self.filter.is_none());
673                        self.filter = Some(filter);
674                        NextState::again(RowGroupDecoderState::StartData {
675                            row_group_info,
676                            column_chunks,
677                            cache_info: Some(cache_info),
678                        })
679                    }
680                }
681            }
682            RowGroupDecoderState::StartData {
683                row_group_info,
684                column_chunks,
685                cache_info,
686            } => {
687                let RowGroupInfo {
688                    row_group_idx,
689                    row_count,
690                    plan_builder,
691                    budget,
692                } = row_group_info;
693
694                let BudgetedReadPlan {
695                    mut plan_builder,
696                    rows_before_budget,
697                    rows_after_budget,
698                    remaining_budget,
699                } = budget.apply_to_plan(plan_builder, row_count);
700
701                if rows_before_budget == 0 {
702                    // ruled out entire row group
703                    return Ok(NextState::result(
704                        RowGroupDecoderState::Finished,
705                        RowGroupBuildResult::Finished { remaining_budget },
706                    ));
707                }
708
709                if rows_after_budget == 0 {
710                    // no rows left after applying limit/offset
711                    return Ok(NextState::result(
712                        RowGroupDecoderState::Finished,
713                        RowGroupBuildResult::Finished { remaining_budget },
714                    ));
715                }
716
717                let data_request = DataRequestBuilder::new(
718                    row_group_idx,
719                    row_count,
720                    self.batch_size,
721                    &self.metadata,
722                    &self.projection,
723                )
724                .with_selection(plan_builder.selection())
725                .with_column_chunks(column_chunks)
726                // Final projection fetch shouldn't expand selection for cache
727                // so don't call with_cache_projection here
728                .build();
729
730                plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
731
732                plan_builder = override_selector_strategy_if_needed(
733                    plan_builder,
734                    &self.projection,
735                    self.row_group_offset_index(row_group_idx),
736                );
737
738                let row_group_info = RowGroupInfo {
739                    row_group_idx,
740                    row_count,
741                    plan_builder,
742                    budget: remaining_budget,
743                };
744
745                NextState::again(RowGroupDecoderState::WaitingOnData {
746                    row_group_info,
747                    data_request,
748                    cache_info,
749                })
750            }
751            // Waiting on data to proceed with reading the output
752            RowGroupDecoderState::WaitingOnData {
753                row_group_info,
754                data_request,
755                cache_info,
756            } => {
757                let needed_ranges = data_request.needed_ranges(&self.buffers);
758                if !needed_ranges.is_empty() {
759                    // still need data
760                    return Ok(NextState::result(
761                        RowGroupDecoderState::WaitingOnData {
762                            row_group_info,
763                            data_request,
764                            cache_info,
765                        },
766                        RowGroupBuildResult::NeedsData(needed_ranges),
767                    ));
768                }
769
770                // otherwise we have all the data we need to proceed
771                let RowGroupInfo {
772                    row_group_idx,
773                    row_count,
774                    plan_builder,
775                    budget,
776                } = row_group_info;
777
778                let row_group = data_request.try_into_in_memory_row_group(
779                    row_group_idx,
780                    row_count,
781                    &self.metadata,
782                    &self.projection,
783                    &mut self.buffers,
784                )?;
785
786                let plan = plan_builder.build();
787
788                // if we have any cached results, connect them up
789                let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
790                    .with_batch_size(self.batch_size)
791                    .with_parquet_metadata(&self.metadata);
792                let array_reader = if let Some(cache_info) = cache_info.as_ref() {
793                    let cache_options: CacheOptions = cache_info.builder().consumer();
794                    array_reader_builder
795                        .with_cache_options(Some(&cache_options))
796                        .build_array_reader(self.fields.as_deref(), &self.projection)
797                } else {
798                    array_reader_builder
799                        .build_array_reader(self.fields.as_deref(), &self.projection)
800                }?;
801
802                let reader = ParquetRecordBatchReader::new(array_reader, plan);
803                NextState::result(
804                    RowGroupDecoderState::Finished,
805                    RowGroupBuildResult::Data {
806                        batch_reader: reader,
807                        remaining_budget: budget,
808                    },
809                )
810            }
811            RowGroupDecoderState::Finished => {
812                return Err(ParquetError::General(String::from(
813                    "Internal Error: try_build called without an active row group",
814                )));
815            }
816        };
817        Ok(result)
818    }
819
820    /// Which columns should be cached?
821    ///
822    /// Returns the columns that are used by the filters *and* then used in the
823    /// final projection, excluding any nested columns.
824    fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
825        let meta = self.metadata.row_group(row_group_idx);
826        match self.compute_cache_projection_inner(filter) {
827            Some(projection) => projection,
828            None => ProjectionMask::none(meta.columns().len()),
829        }
830    }
831
832    fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
833        // Do not compute the projection mask if the predicate cache is disabled
834        if self.max_predicate_cache_size == 0 {
835            return None;
836        }
837        let mut cache_projection = filter.predicates.first()?.projection().clone();
838        for predicate in filter.predicates.iter() {
839            cache_projection.union(predicate.projection());
840        }
841        cache_projection.intersect(&self.projection);
842        self.exclude_nested_columns_from_cache(&cache_projection)
843    }
844
845    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
846    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
847        mask.without_nested_types(self.metadata.file_metadata().schema_descr())
848    }
849
850    /// Get the offset index for the specified row group, if any
851    fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
852        self.metadata
853            .offset_index()
854            .filter(|index| !index.is_empty())
855            .and_then(|index| index.get(row_group_idx))
856            .map(|columns| columns.as_slice())
857    }
858}
859
860/// Override the selection strategy if needed.
861///
862/// Some pages can be skipped during row-group construction if they are not read
863/// by the selections. This means that the data pages for those rows are never
864/// loaded and definition/repetition levels are never read. When using
865/// `RowSelections` selection works because `skip_records()` handles this
866/// case and skips the page accordingly.
867///
868/// However, with the current mask design, all values must be read and decoded
869/// and then a mask filter is applied. Thus if any pages are skipped during
870/// row-group construction, the data pages are missing and cannot be decoded.
871///
872/// A simple example:
873/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
874/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01)
875///
876/// Using the row selection to skip(4), page2 won't be read at all, so in this
877/// case we can't decode all the rows and apply a mask. To correctly apply the
878/// bit mask, we need all 6 values be read, but page2 is not in memory.
879fn override_selector_strategy_if_needed(
880    plan_builder: ReadPlanBuilder,
881    projection_mask: &ProjectionMask,
882    offset_index: Option<&[OffsetIndexMetaData]>,
883) -> ReadPlanBuilder {
884    // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that
885    let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
886        return plan_builder;
887    };
888
889    let preferred_strategy = plan_builder.resolve_selection_strategy();
890
891    let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
892        && plan_builder.selection().is_some_and(|selection| {
893            selection.should_force_selectors(projection_mask, offset_index)
894        });
895
896    let resolved_strategy = if force_selectors {
897        RowSelectionStrategy::Selectors
898    } else {
899        preferred_strategy
900    };
901
902    // override the plan builder strategy with the resolved one
903    let new_policy = match resolved_strategy {
904        RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
905        RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
906    };
907
908    plan_builder.with_row_selection_policy(new_policy)
909}
910
911#[cfg(test)]
912mod tests {
913    use super::*;
914    use crate::arrow::arrow_reader::{RowSelection, RowSelector};
915
916    #[test]
917    // Verify that the size of RowGroupDecoderState does not grow too large
918    fn test_structure_size() {
919        assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 232);
920    }
921
922    #[test]
923    fn test_row_budget_offset_limit_across_row_groups() {
924        let first =
925            RowBudget::new(Some(225), Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
926        assert_eq!(first.rows_before_budget, 200);
927        assert_eq!(first.rows_after_budget, 0);
928        assert_eq!(first.remaining_budget, RowBudget::new(Some(25), Some(20)));
929        assert_eq!(first.plan_builder.num_rows_selected(), Some(0));
930
931        let second = first
932            .remaining_budget
933            .apply_to_plan(ReadPlanBuilder::new(1024), 200);
934        assert_eq!(second.rows_before_budget, 200);
935        assert_eq!(second.rows_after_budget, 20);
936        assert_eq!(second.remaining_budget, RowBudget::new(Some(0), Some(0)));
937        assert_eq!(second.plan_builder.num_rows_selected(), Some(20));
938    }
939
940    #[test]
941    fn test_row_budget_limit_only() {
942        let budgeted =
943            RowBudget::new(None, Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
944        assert_eq!(budgeted.rows_before_budget, 200);
945        assert_eq!(budgeted.rows_after_budget, 20);
946        assert_eq!(budgeted.remaining_budget, RowBudget::new(None, Some(0)));
947        assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(20));
948    }
949
950    #[test]
951    fn test_row_budget_empty_selection() {
952        let empty_selection = RowSelection::from(vec![RowSelector::skip(200)]);
953        let budgeted = RowBudget::new(Some(10), Some(20)).apply_to_plan(
954            ReadPlanBuilder::new(1024).with_selection(Some(empty_selection)),
955            200,
956        );
957        assert_eq!(budgeted.rows_before_budget, 0);
958        assert_eq!(budgeted.rows_after_budget, 0);
959        assert_eq!(
960            budgeted.remaining_budget,
961            RowBudget::new(Some(10), Some(20))
962        );
963        assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(0));
964    }
965}