Skip to main content

parquet/arrow/push_decoder/
remaining.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::{
21    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
22};
23use crate::errors::ParquetError;
24use crate::file::metadata::ParquetMetaData;
25use bytes::Bytes;
26use std::collections::VecDeque;
27use std::ops::Range;
28use std::sync::Arc;
29
30/// Plan for the next queued row group after row-selection slicing.
31#[derive(Debug)]
32enum QueuedRowGroupDecision {
33    /// Hand this row group to the builder.
34    Read(NextRowGroup),
35    /// Skip this row group, and keep scanning with the updated budget.
36    Skip { remaining_budget: RowBudget },
37}
38
39/// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`].
40#[derive(Debug)]
41struct NextRowGroup {
42    row_group_idx: usize,
43    row_count: usize,
44    /// This row group's slice of the global selection, or `None` when all rows
45    /// are selected.
46    selection: Option<RowSelection>,
47    /// Budget snapshot to apply while decoding this row group.
48    budget: RowBudget,
49}
50
51#[derive(Debug)]
52struct RowGroupFrontier {
53    /// Metadata used to resolve row counts for queued row groups.
54    parquet_metadata: Arc<ParquetMetaData>,
55    /// Row group indices not yet handed to the builder.
56    row_groups: VecDeque<usize>,
57    /// Cross-row-group cursor for the optional global row selection.
58    selection: Option<RowSelection>,
59    /// Offset/limit budget before the next readable row group is planned.
60    budget: RowBudget,
61    /// If predicates are present, row groups with selected rows must be read so
62    /// the predicate can decide whether they are actually needed.
63    has_predicates: bool,
64}
65
66impl RowGroupFrontier {
67    fn new(
68        parquet_metadata: Arc<ParquetMetaData>,
69        row_groups: Vec<usize>,
70        selection: Option<RowSelection>,
71        budget: RowBudget,
72        has_predicates: bool,
73    ) -> Self {
74        Self {
75            parquet_metadata,
76            row_groups: VecDeque::from(row_groups),
77            selection,
78            budget,
79            has_predicates,
80        }
81    }
82
83    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, ParquetError> {
84        self.parquet_metadata
85            .row_group(row_group_idx)
86            .num_rows()
87            .try_into()
88            .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))
89    }
90
91    fn update_budget_after_row_group(&mut self, budget: RowBudget) {
92        self.budget = budget;
93    }
94
95    fn clear_remaining(&mut self) {
96        self.selection = None;
97        self.row_groups.clear();
98    }
99
100    /// Plan whether a selected row group should be read or skipped.
101    ///
102    /// Selection-only skips are handled before this method is called. This
103    /// method applies the remaining offset/limit budget and predicate
104    /// conservatism.
105    fn plan_selected_row_group(
106        &self,
107        next_row_group: NextRowGroup,
108        selected_rows: usize,
109    ) -> QueuedRowGroupDecision {
110        if self.has_predicates {
111            return QueuedRowGroupDecision::Read(next_row_group);
112        }
113
114        let rows_after_budget = self.budget.rows_after(selected_rows);
115        if rows_after_budget != 0 {
116            return QueuedRowGroupDecision::Read(next_row_group);
117        }
118
119        QueuedRowGroupDecision::Skip {
120            remaining_budget: self.budget.advance(selected_rows, rows_after_budget),
121        }
122    }
123
124    /// Advance queued row groups until one should be handed to the builder.
125    fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, ParquetError> {
126        loop {
127            let Some(&row_group_idx) = self.row_groups.front() else {
128                return Ok(None);
129            };
130            if self.budget.is_exhausted()
131                || self
132                    .selection
133                    .as_ref()
134                    .is_some_and(|selection| selection.row_count() == 0)
135            {
136                self.clear_remaining();
137                return Ok(None);
138            }
139
140            let row_count = self.row_group_num_rows(row_group_idx)?;
141            let (selection, selected_rows) = match self.selection.as_mut() {
142                Some(selection) => {
143                    let selection = selection.split_off(row_count);
144                    let selected_rows = selection.row_count();
145                    if selected_rows == 0 {
146                        self.row_groups.pop_front();
147                        continue;
148                    }
149
150                    let selection = if selected_rows == row_count {
151                        None
152                    } else {
153                        Some(selection)
154                    };
155                    (selection, selected_rows)
156                }
157                None => (None, row_count),
158            };
159
160            let next_row_group = NextRowGroup {
161                row_group_idx,
162                row_count,
163                selection,
164                budget: self.budget,
165            };
166
167            match self.plan_selected_row_group(next_row_group, selected_rows) {
168                QueuedRowGroupDecision::Read(next_row_group) => {
169                    self.row_groups.pop_front();
170                    return Ok(Some(next_row_group));
171                }
172                QueuedRowGroupDecision::Skip { remaining_budget } => {
173                    self.row_groups.pop_front();
174                    self.budget = remaining_budget;
175                }
176            }
177        }
178    }
179}
180
181/// State machine that tracks the remaining high level chunks (row groups) of
182/// Parquet data left to read.
183///
184/// [`RowGroupFrontier`] owns cross-row-group scan state and selects the next
185/// work item. [`RowGroupReaderBuilder`] owns decoding for the active row group.
186#[derive(Debug)]
187pub(crate) struct RemainingRowGroups {
188    /// Cross-row-group scan state for queued work.
189    frontier: RowGroupFrontier,
190
191    /// State for building the reader for the current row group
192    row_group_reader_builder: RowGroupReaderBuilder,
193}
194
195impl RemainingRowGroups {
196    pub fn new(
197        parquet_metadata: Arc<ParquetMetaData>,
198        row_groups: Vec<usize>,
199        selection: Option<RowSelection>,
200        budget: RowBudget,
201        has_predicates: bool,
202        row_group_reader_builder: RowGroupReaderBuilder,
203    ) -> Self {
204        Self {
205            frontier: RowGroupFrontier::new(
206                parquet_metadata,
207                row_groups,
208                selection,
209                budget,
210                has_predicates,
211            ),
212            row_group_reader_builder,
213        }
214    }
215
216    /// Push new data buffers that can be used to satisfy pending requests
217    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
218        self.row_group_reader_builder.push_data(ranges, buffers);
219    }
220
221    /// Return the total number of bytes buffered so far
222    pub fn buffered_bytes(&self) -> u64 {
223        self.row_group_reader_builder.buffered_bytes()
224    }
225
226    /// Clear any staged ranges currently buffered for future decode work
227    pub fn clear_all_ranges(&mut self) {
228        self.row_group_reader_builder.clear_all_ranges();
229    }
230
231    /// returns [`ParquetRecordBatchReader`] suitable for reading the next
232    /// group of rows from the Parquet data, or the list of data ranges still
233    /// needed to proceed
234    pub fn try_next_reader(
235        &mut self,
236    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
237        loop {
238            if !self.row_group_reader_builder.has_active_row_group() {
239                // We are done with the previous row group, seek to the next one
240                // from the frontier, if any.
241
242                match self.frontier.next_readable_row_group()? {
243                    Some(NextRowGroup {
244                        row_group_idx,
245                        row_count,
246                        selection,
247                        budget,
248                    }) => {
249                        self.row_group_reader_builder.next_row_group(
250                            row_group_idx,
251                            row_count,
252                            selection,
253                            budget,
254                        )?;
255                    }
256                    None => return Ok(DecodeResult::Finished),
257                }
258            }
259
260            match self.row_group_reader_builder.try_build()? {
261                RowGroupBuildResult::Finished { remaining_budget } => {
262                    self.frontier
263                        .update_budget_after_row_group(remaining_budget);
264                    // reader is done, proceed to the next row group
265                }
266                RowGroupBuildResult::NeedsData(ranges) => {
267                    // need more data to proceed
268                    return Ok(DecodeResult::NeedsData(ranges));
269                }
270                RowGroupBuildResult::Data {
271                    batch_reader,
272                    remaining_budget,
273                } => {
274                    self.frontier
275                        .update_budget_after_row_group(remaining_budget);
276                    // ready to read the row group
277                    return Ok(DecodeResult::Data(batch_reader));
278                }
279            }
280        }
281    }
282}