Skip to main content

parquet/arrow/push_decoder/reader_builder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod data;
19mod filter;
20
21use crate::arrow::ProjectionMask;
22use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache};
23use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
24use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
25use crate::arrow::arrow_reader::{
26    ParquetRecordBatchReader, PredicateOptions, ReadPlanBuilder, RowFilter, RowSelection,
27    RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, RwLock};
43
44/// The current row group being read, its read plan, and its offset/limit budget.
45#[derive(Debug)]
46struct RowGroupInfo {
47    row_group_idx: usize,
48    row_count: usize,
49    plan_builder: ReadPlanBuilder,
50    budget: RowBudget,
51}
52
53/// This is the inner state machine for reading a single row group.
54#[derive(Debug)]
55enum RowGroupDecoderState {
56    Start {
57        row_group_info: RowGroupInfo,
58    },
59    /// Planning filters, but haven't yet requested data to evaluate them
60    Filters {
61        row_group_info: RowGroupInfo,
62        /// Any previously read column chunk data from prior filters
63        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
64        filter_info: FilterInfo,
65    },
66    /// Needs data to evaluate current filter
67    WaitingOnFilterData {
68        row_group_info: RowGroupInfo,
69        filter_info: FilterInfo,
70        data_request: DataRequest,
71    },
72    /// Know what data to actually read, after all predicates
73    StartData {
74        row_group_info: RowGroupInfo,
75        /// Any previously read column chunk data from the filtering phase
76        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
77        /// Any cached filter results
78        cache_info: Option<CacheInfo>,
79    },
80    /// Needs data to proceed with reading the output
81    WaitingOnData {
82        row_group_info: RowGroupInfo,
83        data_request: DataRequest,
84        /// Any cached filter results
85        cache_info: Option<CacheInfo>,
86    },
87    /// Finished (or not yet started) reading this group
88    Finished,
89}
90
91/// Running offset/limit budget shared across row groups.
92#[derive(Debug, Clone, Copy, Eq, PartialEq)]
93pub(crate) struct RowBudget {
94    offset: Option<usize>,
95    limit: Option<usize>,
96}
97
98impl RowBudget {
99    pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
100        Self { offset, limit }
101    }
102
103    pub(crate) fn is_exhausted(self) -> bool {
104        matches!(self.limit, Some(0))
105    }
106
107    /// Returns how many selected rows remain after applying this budget.
108    pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
109        let rows_after_offset = rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
110        match self.limit {
111            Some(limit) => rows_after_offset.min(limit),
112            None => rows_after_offset,
113        }
114    }
115
116    /// Returns the number of selected rows needed before applying the offset.
117    fn selected_row_limit(self) -> Option<usize> {
118        self.limit
119            .map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
120    }
121
122    fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> BudgetedReadPlan {
123        let rows_before_budget = plan_builder.num_rows_selected().unwrap_or(row_count);
124        let plan_builder = plan_builder
125            .limited(row_count)
126            .with_offset(self.offset)
127            .with_limit(self.limit)
128            .build_limited();
129        let rows_after_budget = self.rows_after(rows_before_budget);
130
131        BudgetedReadPlan {
132            plan_builder,
133            rows_before_budget,
134            rows_after_budget,
135            remaining_budget: self.advance(rows_before_budget, rows_after_budget),
136        }
137    }
138
139    /// Advance the budget past one row group.
140    ///
141    /// `rows_before_budget` is the number of rows selected before applying the
142    /// budget, and `rows_after_budget` is the number retained for output from
143    /// this row group.
144    pub(crate) fn advance(mut self, rows_before_budget: usize, rows_after_budget: usize) -> Self {
145        if let Some(offset) = &mut self.offset {
146            // Reduction is either because of offset or limit, as limit is applied
147            // after offset has been "exhausted" can just use saturating sub here.
148            *offset = offset.saturating_sub(rows_before_budget - rows_after_budget);
149        }
150
151        if rows_after_budget != 0 {
152            if let Some(limit) = &mut self.limit {
153                *limit -= rows_after_budget;
154            }
155        }
156
157        self
158    }
159}
160
161#[derive(Debug)]
162struct BudgetedReadPlan {
163    /// Read plan after applying this row group's share of the offset/limit budget.
164    plan_builder: ReadPlanBuilder,
165    /// Number of rows selected by row selection and predicates before applying
166    /// this row group's offset/limit budget.
167    rows_before_budget: usize,
168    /// Number of selected rows that remain to be read after applying this row
169    /// group's offset/limit budget.
170    rows_after_budget: usize,
171    /// Budget remaining for later row groups.
172    remaining_budget: RowBudget,
173}
174
175#[derive(Debug)]
176pub(crate) enum RowGroupBuildResult {
177    /// The active row group is complete without producing a reader.
178    Finished {
179        /// Budget remaining after applying this row group's selection.
180        remaining_budget: RowBudget,
181    },
182    /// More bytes are needed before the active row group can make progress.
183    NeedsData(Vec<Range<u64>>),
184    /// The active row group produced a reader.
185    Data {
186        batch_reader: ParquetRecordBatchReader,
187        /// Budget remaining after applying this row group's selection.
188        remaining_budget: RowBudget,
189    },
190}
191
192/// Result of a state transition
193#[derive(Debug)]
194struct NextState {
195    next_state: RowGroupDecoderState,
196    /// result to return, if any
197    ///
198    /// * `Some`: the processing should stop and return the result
199    /// * `None`: processing should continue
200    result: Option<RowGroupBuildResult>,
201}
202
203impl NextState {
204    /// The next state with no result.
205    ///
206    /// This indicates processing should continue
207    fn again(next_state: RowGroupDecoderState) -> Self {
208        Self {
209            next_state,
210            result: None,
211        }
212    }
213
214    /// Create a NextState with a result that should be returned
215    fn result(next_state: RowGroupDecoderState, result: RowGroupBuildResult) -> Self {
216        Self {
217            next_state,
218            result: Some(result),
219        }
220    }
221}
222
223/// Builder for [`ParquetRecordBatchReader`] for a single row group
224///
225/// This struct drives the main state machine for decoding each row group -- it
226/// determines what data is needed, and then assembles the
227/// `ParquetRecordBatchReader` when all data is available.
228#[derive(Debug)]
229pub(crate) struct RowGroupReaderBuilder {
230    /// The output batch size
231    batch_size: usize,
232
233    /// What columns to project (produce in each output batch)
234    projection: ProjectionMask,
235
236    /// The Parquet file metadata
237    metadata: Arc<ParquetMetaData>,
238
239    /// Top level parquet schema and arrow schema mapping
240    fields: Option<Arc<ParquetField>>,
241
242    /// Optional filter
243    filter: Option<RowFilter>,
244
245    /// The size in bytes of the predicate cache to use
246    ///
247    /// See [`RowGroupCache`] for details.
248    max_predicate_cache_size: usize,
249
250    /// The metrics collector
251    metrics: ArrowReaderMetrics,
252
253    /// Strategy for materialising row selections
254    row_selection_policy: RowSelectionPolicy,
255
256    /// Current state of the decoder.
257    ///
258    /// It is taken when processing, and must be put back before returning
259    /// it is a bug error if it is not put back after transitioning states.
260    state: Option<RowGroupDecoderState>,
261
262    /// The underlying data store
263    buffers: PushBuffers,
264}
265
266impl RowGroupReaderBuilder {
267    /// Create a new RowGroupReaderBuilder
268    #[expect(clippy::too_many_arguments)]
269    pub(crate) fn new(
270        batch_size: usize,
271        projection: ProjectionMask,
272        metadata: Arc<ParquetMetaData>,
273        fields: Option<Arc<ParquetField>>,
274        filter: Option<RowFilter>,
275        metrics: ArrowReaderMetrics,
276        max_predicate_cache_size: usize,
277        buffers: PushBuffers,
278        row_selection_policy: RowSelectionPolicy,
279    ) -> Self {
280        Self {
281            batch_size,
282            projection,
283            metadata,
284            fields,
285            filter,
286            metrics,
287            max_predicate_cache_size,
288            row_selection_policy,
289            state: Some(RowGroupDecoderState::Finished),
290            buffers,
291        }
292    }
293
294    /// Push new data buffers that can be used to satisfy pending requests
295    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
296        self.buffers.push_ranges(ranges, buffers);
297    }
298
299    /// Returns the total number of buffered bytes available
300    pub fn buffered_bytes(&self) -> u64 {
301        self.buffers.buffered_bytes()
302    }
303
304    /// Clear any staged ranges currently buffered for future decode work.
305    pub fn clear_all_ranges(&mut self) {
306        self.buffers.clear_all_ranges();
307    }
308
309    /// take the current state, leaving None in its place.
310    ///
311    /// Returns an error if there the state wasn't put back after the previous
312    /// call to [`Self::take_state`].
313    ///
314    /// Any code that calls this method must ensure that the state is put back
315    /// before returning, otherwise the reader will error next time it is called
316    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
317        self.state.take().ok_or_else(|| {
318            ParquetError::General(String::from(
319                "Internal Error: RowGroupReader in invalid state",
320            ))
321        })
322    }
323
324    /// Returns true if this builder is currently decoding a row group.
325    pub(crate) fn has_active_row_group(&self) -> bool {
326        !matches!(self.state, Some(RowGroupDecoderState::Finished))
327    }
328
329    /// Setup this reader to read the next row group
330    pub(crate) fn next_row_group(
331        &mut self,
332        row_group_idx: usize,
333        row_count: usize,
334        selection: Option<RowSelection>,
335        budget: RowBudget,
336    ) -> Result<(), ParquetError> {
337        let state = self.take_state()?;
338        if !matches!(state, RowGroupDecoderState::Finished) {
339            return Err(ParquetError::General(format!(
340                "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
341            )));
342        }
343        let plan_builder = ReadPlanBuilder::new(self.batch_size)
344            .with_selection(selection)
345            .with_row_selection_policy(self.row_selection_policy);
346
347        let row_group_info = RowGroupInfo {
348            row_group_idx,
349            row_count,
350            plan_builder,
351            budget,
352        };
353
354        self.state = Some(RowGroupDecoderState::Start { row_group_info });
355        Ok(())
356    }
357
358    /// Try to build the next `ParquetRecordBatchReader` for the active row group.
359    ///
360    /// Returns [`RowGroupBuildResult::NeedsData`] if more data is needed,
361    /// [`RowGroupBuildResult::Data`] if a reader is ready, or
362    /// [`RowGroupBuildResult::Finished`] if the row group completed without
363    /// producing a reader.
364    pub(crate) fn try_build(&mut self) -> Result<RowGroupBuildResult, ParquetError> {
365        loop {
366            let current_state = self.take_state()?;
367            // Try to transition the decoder.
368            match self.try_transition(current_state)? {
369                // Either produced a batch reader, needed input, or finished
370                NextState {
371                    next_state,
372                    result: Some(result),
373                } => {
374                    // put back the next state
375                    self.state = Some(next_state);
376                    return Ok(result);
377                }
378                // completed one internal state, maybe can proceed further
379                NextState {
380                    next_state,
381                    result: None,
382                } => {
383                    // continue processing
384                    self.state = Some(next_state);
385                }
386            }
387        }
388    }
389
390    /// Current state --> next state + optional output
391    ///
392    /// This is the main state transition function for the row group reader
393    /// and encodes the row group decoding state machine.
394    ///
395    /// # Notes
396    ///
397    /// This structure is used to reduce the indentation level of the main loop
398    /// in try_build
399    fn try_transition(
400        &mut self,
401        current_state: RowGroupDecoderState,
402    ) -> Result<NextState, ParquetError> {
403        let result = match current_state {
404            RowGroupDecoderState::Start { row_group_info } => {
405                debug_assert!(
406                    !row_group_info.budget.is_exhausted(),
407                    "RowGroupFrontier should not hand off row groups after the output limit is exhausted"
408                );
409
410                let column_chunks = None; // no prior column chunks
411
412                let Some(filter) = self.filter.take() else {
413                    // no filter, start trying to read data immediately
414                    return Ok(NextState::again(RowGroupDecoderState::StartData {
415                        row_group_info,
416                        column_chunks,
417                        cache_info: None,
418                    }));
419                };
420                // no predicates in filter, so start reading immediately
421                if filter.predicates.is_empty() {
422                    return Ok(NextState::again(RowGroupDecoderState::StartData {
423                        row_group_info,
424                        column_chunks,
425                        cache_info: None,
426                    }));
427                };
428
429                // we have predicates to evaluate
430                let cache_projection =
431                    self.compute_cache_projection(row_group_info.row_group_idx, &filter);
432
433                let cache_info = CacheInfo::new(
434                    cache_projection,
435                    Arc::new(RwLock::new(RowGroupCache::new(
436                        self.batch_size,
437                        self.max_predicate_cache_size,
438                    ))),
439                );
440
441                let filter_info = FilterInfo::new(filter, cache_info);
442                NextState::again(RowGroupDecoderState::Filters {
443                    row_group_info,
444                    filter_info,
445                    column_chunks,
446                })
447            }
448            // need to evaluate filters
449            RowGroupDecoderState::Filters {
450                row_group_info,
451                column_chunks,
452                filter_info,
453            } => {
454                let RowGroupInfo {
455                    row_group_idx,
456                    row_count,
457                    plan_builder,
458                    budget,
459                } = row_group_info;
460
461                // If nothing is selected, we are done with this row group
462                if !plan_builder.selects_any() {
463                    // ruled out entire row group
464                    self.filter = Some(filter_info.into_filter());
465                    return Ok(NextState::result(
466                        RowGroupDecoderState::Finished,
467                        RowGroupBuildResult::Finished {
468                            remaining_budget: budget,
469                        },
470                    ));
471                }
472
473                // Make a request for the data needed to evaluate the current predicate
474                let predicate = filter_info.current();
475
476                // need to fetch pages the column needs for decoding, figure
477                // that out based on the current selection and projection
478                let data_request = DataRequestBuilder::new(
479                    row_group_idx,
480                    row_count,
481                    self.batch_size,
482                    &self.metadata,
483                    predicate.projection(), // use the predicate's projection
484                )
485                .with_selection(plan_builder.selection())
486                // Fetch predicate columns; expand selection only for cached predicate columns
487                .with_cache_projection(Some(filter_info.cache_projection()))
488                .with_column_chunks(column_chunks)
489                .build();
490
491                let row_group_info = RowGroupInfo {
492                    row_group_idx,
493                    row_count,
494                    plan_builder,
495                    budget,
496                };
497
498                NextState::again(RowGroupDecoderState::WaitingOnFilterData {
499                    row_group_info,
500                    filter_info,
501                    data_request,
502                })
503            }
504            RowGroupDecoderState::WaitingOnFilterData {
505                row_group_info,
506                data_request,
507                mut filter_info,
508            } => {
509                // figure out what ranges we still need
510                let needed_ranges = data_request.needed_ranges(&self.buffers);
511                if !needed_ranges.is_empty() {
512                    // still need data
513                    return Ok(NextState::result(
514                        RowGroupDecoderState::WaitingOnFilterData {
515                            row_group_info,
516                            filter_info,
517                            data_request,
518                        },
519                        RowGroupBuildResult::NeedsData(needed_ranges),
520                    ));
521                }
522
523                // otherwise we have all the data we need to evaluate the predicate
524                let RowGroupInfo {
525                    row_group_idx,
526                    row_count,
527                    mut plan_builder,
528                    budget,
529                } = row_group_info;
530
531                let predicate = filter_info.current();
532
533                let row_group = data_request.try_into_in_memory_row_group(
534                    row_group_idx,
535                    row_count,
536                    &self.metadata,
537                    predicate.projection(),
538                    &mut self.buffers,
539                )?;
540
541                let cache_options = filter_info.cache_builder().producer();
542
543                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
544                    .with_batch_size(self.batch_size)
545                    .with_cache_options(Some(&cache_options))
546                    .with_parquet_metadata(&self.metadata)
547                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
548
549                // Reset to original policy before each predicate so the override
550                // can detect page skipping for THIS predicate's columns.
551                // Without this reset, a prior predicate's override (e.g. Mask)
552                // carries forward and the check returns early, missing unfetched
553                // pages for subsequent predicates.
554                plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
555
556                // Prepare to evaluate the filter.
557                // Note: first update the selection strategy to properly handle any pages
558                // pruned during fetch
559                plan_builder = override_selector_strategy_if_needed(
560                    plan_builder,
561                    predicate.projection(),
562                    self.row_group_offset_index(row_group_idx),
563                );
564
565                // When this is the final predicate in the chain and an output
566                // limit is set, tell the filter evaluation to stop once enough
567                // matching rows have been accumulated.
568                let predicate_limit = filter_info
569                    .is_last()
570                    .then(|| budget.selected_row_limit())
571                    .flatten();
572
573                // Evaluate the filter via `with_predicate_options`, opting into
574                // early termination when this is the final predicate and an
575                // output limit was set.
576                let mut predicate_options =
577                    PredicateOptions::new(array_reader, filter_info.current_mut());
578                if let Some(limit) = predicate_limit {
579                    predicate_options = predicate_options.with_limit(limit, row_count);
580                }
581                plan_builder = plan_builder.with_predicate_options(predicate_options)?;
582
583                let row_group_info = RowGroupInfo {
584                    row_group_idx,
585                    row_count,
586                    plan_builder,
587                    budget,
588                };
589
590                // Take back the column chunks that were read
591                let column_chunks = Some(row_group.column_chunks);
592
593                // advance to the next predicate, if any
594                match filter_info.advance() {
595                    AdvanceResult::Continue(filter_info) => {
596                        NextState::again(RowGroupDecoderState::Filters {
597                            row_group_info,
598                            column_chunks,
599                            filter_info,
600                        })
601                    }
602                    // done with predicates, proceed to reading data
603                    AdvanceResult::Done(filter, cache_info) => {
604                        // remember we need to put back the filter
605                        assert!(self.filter.is_none());
606                        self.filter = Some(filter);
607                        NextState::again(RowGroupDecoderState::StartData {
608                            row_group_info,
609                            column_chunks,
610                            cache_info: Some(cache_info),
611                        })
612                    }
613                }
614            }
615            RowGroupDecoderState::StartData {
616                row_group_info,
617                column_chunks,
618                cache_info,
619            } => {
620                let RowGroupInfo {
621                    row_group_idx,
622                    row_count,
623                    plan_builder,
624                    budget,
625                } = row_group_info;
626
627                let BudgetedReadPlan {
628                    mut plan_builder,
629                    rows_before_budget,
630                    rows_after_budget,
631                    remaining_budget,
632                } = budget.apply_to_plan(plan_builder, row_count);
633
634                if rows_before_budget == 0 {
635                    // ruled out entire row group
636                    return Ok(NextState::result(
637                        RowGroupDecoderState::Finished,
638                        RowGroupBuildResult::Finished { remaining_budget },
639                    ));
640                }
641
642                if rows_after_budget == 0 {
643                    // no rows left after applying limit/offset
644                    return Ok(NextState::result(
645                        RowGroupDecoderState::Finished,
646                        RowGroupBuildResult::Finished { remaining_budget },
647                    ));
648                }
649
650                let data_request = DataRequestBuilder::new(
651                    row_group_idx,
652                    row_count,
653                    self.batch_size,
654                    &self.metadata,
655                    &self.projection,
656                )
657                .with_selection(plan_builder.selection())
658                .with_column_chunks(column_chunks)
659                // Final projection fetch shouldn't expand selection for cache
660                // so don't call with_cache_projection here
661                .build();
662
663                plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
664
665                plan_builder = override_selector_strategy_if_needed(
666                    plan_builder,
667                    &self.projection,
668                    self.row_group_offset_index(row_group_idx),
669                );
670
671                let row_group_info = RowGroupInfo {
672                    row_group_idx,
673                    row_count,
674                    plan_builder,
675                    budget: remaining_budget,
676                };
677
678                NextState::again(RowGroupDecoderState::WaitingOnData {
679                    row_group_info,
680                    data_request,
681                    cache_info,
682                })
683            }
684            // Waiting on data to proceed with reading the output
685            RowGroupDecoderState::WaitingOnData {
686                row_group_info,
687                data_request,
688                cache_info,
689            } => {
690                let needed_ranges = data_request.needed_ranges(&self.buffers);
691                if !needed_ranges.is_empty() {
692                    // still need data
693                    return Ok(NextState::result(
694                        RowGroupDecoderState::WaitingOnData {
695                            row_group_info,
696                            data_request,
697                            cache_info,
698                        },
699                        RowGroupBuildResult::NeedsData(needed_ranges),
700                    ));
701                }
702
703                // otherwise we have all the data we need to proceed
704                let RowGroupInfo {
705                    row_group_idx,
706                    row_count,
707                    plan_builder,
708                    budget,
709                } = row_group_info;
710
711                let row_group = data_request.try_into_in_memory_row_group(
712                    row_group_idx,
713                    row_count,
714                    &self.metadata,
715                    &self.projection,
716                    &mut self.buffers,
717                )?;
718
719                let plan = plan_builder.build();
720
721                // if we have any cached results, connect them up
722                let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
723                    .with_batch_size(self.batch_size)
724                    .with_parquet_metadata(&self.metadata);
725                let array_reader = if let Some(cache_info) = cache_info.as_ref() {
726                    let cache_options: CacheOptions = cache_info.builder().consumer();
727                    array_reader_builder
728                        .with_cache_options(Some(&cache_options))
729                        .build_array_reader(self.fields.as_deref(), &self.projection)
730                } else {
731                    array_reader_builder
732                        .build_array_reader(self.fields.as_deref(), &self.projection)
733                }?;
734
735                let reader = ParquetRecordBatchReader::new(array_reader, plan);
736                NextState::result(
737                    RowGroupDecoderState::Finished,
738                    RowGroupBuildResult::Data {
739                        batch_reader: reader,
740                        remaining_budget: budget,
741                    },
742                )
743            }
744            RowGroupDecoderState::Finished => {
745                return Err(ParquetError::General(String::from(
746                    "Internal Error: try_build called without an active row group",
747                )));
748            }
749        };
750        Ok(result)
751    }
752
753    /// Which columns should be cached?
754    ///
755    /// Returns the columns that are used by the filters *and* then used in the
756    /// final projection, excluding any nested columns.
757    fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
758        let meta = self.metadata.row_group(row_group_idx);
759        match self.compute_cache_projection_inner(filter) {
760            Some(projection) => projection,
761            None => ProjectionMask::none(meta.columns().len()),
762        }
763    }
764
765    fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
766        // Do not compute the projection mask if the predicate cache is disabled
767        if self.max_predicate_cache_size == 0 {
768            return None;
769        }
770        let mut cache_projection = filter.predicates.first()?.projection().clone();
771        for predicate in filter.predicates.iter() {
772            cache_projection.union(predicate.projection());
773        }
774        cache_projection.intersect(&self.projection);
775        self.exclude_nested_columns_from_cache(&cache_projection)
776    }
777
778    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
779    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
780        mask.without_nested_types(self.metadata.file_metadata().schema_descr())
781    }
782
783    /// Get the offset index for the specified row group, if any
784    fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
785        self.metadata
786            .offset_index()
787            .filter(|index| !index.is_empty())
788            .and_then(|index| index.get(row_group_idx))
789            .map(|columns| columns.as_slice())
790    }
791}
792
793/// Override the selection strategy if needed.
794///
795/// Some pages can be skipped during row-group construction if they are not read
796/// by the selections. This means that the data pages for those rows are never
797/// loaded and definition/repetition levels are never read. When using
798/// `RowSelections` selection works because `skip_records()` handles this
799/// case and skips the page accordingly.
800///
801/// However, with the current mask design, all values must be read and decoded
802/// and then a mask filter is applied. Thus if any pages are skipped during
803/// row-group construction, the data pages are missing and cannot be decoded.
804///
805/// A simple example:
806/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
807/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01)
808///
809/// Using the row selection to skip(4), page2 won't be read at all, so in this
810/// case we can't decode all the rows and apply a mask. To correctly apply the
811/// bit mask, we need all 6 values be read, but page2 is not in memory.
812fn override_selector_strategy_if_needed(
813    plan_builder: ReadPlanBuilder,
814    projection_mask: &ProjectionMask,
815    offset_index: Option<&[OffsetIndexMetaData]>,
816) -> ReadPlanBuilder {
817    // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that
818    let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
819        return plan_builder;
820    };
821
822    let preferred_strategy = plan_builder.resolve_selection_strategy();
823
824    let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
825        && plan_builder.selection().is_some_and(|selection| {
826            selection.should_force_selectors(projection_mask, offset_index)
827        });
828
829    let resolved_strategy = if force_selectors {
830        RowSelectionStrategy::Selectors
831    } else {
832        preferred_strategy
833    };
834
835    // override the plan builder strategy with the resolved one
836    let new_policy = match resolved_strategy {
837        RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
838        RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
839    };
840
841    plan_builder.with_row_selection_policy(new_policy)
842}
843
844#[cfg(test)]
845mod tests {
846    use super::*;
847    use crate::arrow::arrow_reader::{RowSelection, RowSelector};
848
849    #[test]
850    // Verify that the size of RowGroupDecoderState does not grow too large
851    fn test_structure_size() {
852        assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 232);
853    }
854
855    #[test]
856    fn test_row_budget_offset_limit_across_row_groups() {
857        let first =
858            RowBudget::new(Some(225), Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
859        assert_eq!(first.rows_before_budget, 200);
860        assert_eq!(first.rows_after_budget, 0);
861        assert_eq!(first.remaining_budget, RowBudget::new(Some(25), Some(20)));
862        assert_eq!(first.plan_builder.num_rows_selected(), Some(0));
863
864        let second = first
865            .remaining_budget
866            .apply_to_plan(ReadPlanBuilder::new(1024), 200);
867        assert_eq!(second.rows_before_budget, 200);
868        assert_eq!(second.rows_after_budget, 20);
869        assert_eq!(second.remaining_budget, RowBudget::new(Some(0), Some(0)));
870        assert_eq!(second.plan_builder.num_rows_selected(), Some(20));
871    }
872
873    #[test]
874    fn test_row_budget_limit_only() {
875        let budgeted =
876            RowBudget::new(None, Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
877        assert_eq!(budgeted.rows_before_budget, 200);
878        assert_eq!(budgeted.rows_after_budget, 20);
879        assert_eq!(budgeted.remaining_budget, RowBudget::new(None, Some(0)));
880        assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(20));
881    }
882
883    #[test]
884    fn test_row_budget_empty_selection() {
885        let empty_selection = RowSelection::from(vec![RowSelector::skip(200)]);
886        let budgeted = RowBudget::new(Some(10), Some(20)).apply_to_plan(
887            ReadPlanBuilder::new(1024).with_selection(Some(empty_selection)),
888            200,
889        );
890        assert_eq!(budgeted.rows_before_budget, 0);
891        assert_eq!(budgeted.rows_after_budget, 0);
892        assert_eq!(
893            budgeted.remaining_budget,
894            RowBudget::new(Some(10), Some(20))
895        );
896        assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(0));
897    }
898}