Skip to main content

parquet/arrow/push_decoder/
remaining.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::{
21    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, RowGroupReaderBuilderParts,
22};
23use crate::errors::ParquetError;
24use crate::file::metadata::ParquetMetaData;
25use arrow_schema::SchemaRef;
26use bytes::Bytes;
27use std::collections::VecDeque;
28use std::ops::Range;
29use std::sync::Arc;
30
31/// Plan for the next queued row group after row-selection slicing.
32#[derive(Debug)]
33enum QueuedRowGroupDecision {
34    /// Hand this row group to the builder.
35    Read(NextRowGroup),
36    /// Skip this row group, and keep scanning with the updated budget.
37    Skip { remaining_budget: RowBudget },
38}
39
40/// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`].
41#[derive(Debug)]
42struct NextRowGroup {
43    row_group_idx: usize,
44    row_count: usize,
45    /// This row group's slice of the global selection, or `None` when all rows
46    /// are selected.
47    selection: Option<RowSelection>,
48    /// Budget snapshot to apply while decoding this row group.
49    budget: RowBudget,
50}
51
52#[derive(Debug)]
53struct RowGroupFrontier {
54    /// Metadata used to resolve row counts for queued row groups.
55    parquet_metadata: Arc<ParquetMetaData>,
56    /// Row group indices not yet handed to the builder.
57    row_groups: VecDeque<usize>,
58    /// Cross-row-group cursor for the optional global row selection.
59    selection: Option<RowSelection>,
60    /// Offset/limit budget before the next readable row group is planned.
61    budget: RowBudget,
62    /// If predicates are present, row groups with selected rows must be read so
63    /// the predicate can decide whether they are actually needed.
64    has_predicates: bool,
65}
66
67impl RowGroupFrontier {
68    fn new(
69        parquet_metadata: Arc<ParquetMetaData>,
70        row_groups: Vec<usize>,
71        selection: Option<RowSelection>,
72        budget: RowBudget,
73        has_predicates: bool,
74    ) -> Self {
75        Self {
76            parquet_metadata,
77            row_groups: VecDeque::from(row_groups),
78            selection,
79            budget,
80            has_predicates,
81        }
82    }
83
84    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, ParquetError> {
85        self.parquet_metadata
86            .row_group(row_group_idx)
87            .num_rows()
88            .try_into()
89            .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))
90    }
91
92    fn update_budget_after_row_group(&mut self, budget: RowBudget) {
93        self.budget = budget;
94    }
95
96    fn clear_remaining(&mut self) {
97        self.selection = None;
98        self.row_groups.clear();
99    }
100
101    /// Plan whether a selected row group should be read or skipped.
102    ///
103    /// Selection-only skips are handled before this method is called. This
104    /// method applies the remaining offset/limit budget and predicate
105    /// conservatism.
106    fn plan_selected_row_group(
107        &self,
108        next_row_group: NextRowGroup,
109        selected_rows: usize,
110    ) -> QueuedRowGroupDecision {
111        if self.has_predicates {
112            return QueuedRowGroupDecision::Read(next_row_group);
113        }
114
115        let rows_after_budget = self.budget.rows_after(selected_rows);
116        if rows_after_budget != 0 {
117            return QueuedRowGroupDecision::Read(next_row_group);
118        }
119
120        QueuedRowGroupDecision::Skip {
121            remaining_budget: self.budget.advance(selected_rows, rows_after_budget),
122        }
123    }
124
125    /// Advance queued row groups until one should be handed to the builder.
126    fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, ParquetError> {
127        loop {
128            let Some(&row_group_idx) = self.row_groups.front() else {
129                return Ok(None);
130            };
131            if self.budget.is_exhausted()
132                || self
133                    .selection
134                    .as_ref()
135                    .is_some_and(|selection| selection.row_count() == 0)
136            {
137                self.clear_remaining();
138                return Ok(None);
139            }
140
141            let row_count = self.row_group_num_rows(row_group_idx)?;
142            let (selection, selected_rows) = match self.selection.as_mut() {
143                Some(selection) => {
144                    let selection = selection.split_off(row_count);
145                    let selected_rows = selection.row_count();
146                    if selected_rows == 0 {
147                        self.row_groups.pop_front();
148                        continue;
149                    }
150
151                    let selection = if selected_rows == row_count {
152                        None
153                    } else {
154                        Some(selection)
155                    };
156                    (selection, selected_rows)
157                }
158                None => (None, row_count),
159            };
160
161            let next_row_group = NextRowGroup {
162                row_group_idx,
163                row_count,
164                selection,
165                budget: self.budget,
166            };
167
168            match self.plan_selected_row_group(next_row_group, selected_rows) {
169                QueuedRowGroupDecision::Read(next_row_group) => {
170                    self.row_groups.pop_front();
171                    return Ok(Some(next_row_group));
172                }
173                QueuedRowGroupDecision::Skip { remaining_budget } => {
174                    self.row_groups.pop_front();
175                    self.budget = remaining_budget;
176                }
177            }
178        }
179    }
180}
181
182/// State machine that tracks the remaining high level chunks (row groups) of
183/// Parquet data left to read.
184///
185/// [`RowGroupFrontier`] owns cross-row-group scan state and selects the next
186/// work item. [`RowGroupReaderBuilder`] owns decoding for the active row group.
187#[derive(Debug)]
188pub(crate) struct RemainingRowGroups {
189    /// The arrow schema of the decoded output. Carried only so
190    /// [`Self::into_parts`] can hand it to a rebuilt builder; unused while
191    /// decoding.
192    schema: SchemaRef,
193
194    /// Cross-row-group scan state for queued work.
195    frontier: RowGroupFrontier,
196
197    /// State for building the reader for the current row group
198    row_group_reader_builder: RowGroupReaderBuilder,
199}
200
201/// The state recovered from a [`RemainingRowGroups`] by
202/// [`RemainingRowGroups::into_parts`], describing the row groups *not* yet
203/// decoded so a builder reconstructed from it resumes where the decoder left off.
204#[derive(Debug)]
205pub(crate) struct RemainingRowGroupsParts {
206    /// The arrow schema of the decoded output.
207    pub schema: SchemaRef,
208    /// The Parquet file metadata.
209    pub metadata: Arc<ParquetMetaData>,
210    /// Row groups not yet handed to the reader builder.
211    pub row_groups: Vec<usize>,
212    /// The not-yet-consumed slice of the global row selection.
213    pub selection: Option<RowSelection>,
214    /// Offset still to be skipped before the next readable row group.
215    pub offset: Option<usize>,
216    /// Output rows still permitted across the remaining row groups.
217    pub limit: Option<usize>,
218    /// Builder-configurable parts of the inner row-group reader builder.
219    pub reader_builder: RowGroupReaderBuilderParts,
220}
221
222impl RemainingRowGroups {
223    pub fn new(
224        schema: SchemaRef,
225        parquet_metadata: Arc<ParquetMetaData>,
226        row_groups: Vec<usize>,
227        selection: Option<RowSelection>,
228        budget: RowBudget,
229        has_predicates: bool,
230        row_group_reader_builder: RowGroupReaderBuilder,
231    ) -> Self {
232        Self {
233            schema,
234            frontier: RowGroupFrontier::new(
235                parquet_metadata,
236                row_groups,
237                selection,
238                budget,
239                has_predicates,
240            ),
241            row_group_reader_builder,
242        }
243    }
244
245    /// Decompose into [`RemainingRowGroupsParts`].
246    ///
247    /// Must be called at a row-group boundary (see
248    /// [`Self::is_at_row_group_boundary`]). The inner reader builder's runtime
249    /// decode state is discarded; its buffered bytes are carried through.
250    pub(crate) fn into_parts(self) -> RemainingRowGroupsParts {
251        let Self {
252            schema,
253            frontier,
254            row_group_reader_builder,
255        } = self;
256        // `has_predicates` is recomputed by `build()` from the filter.
257        let RowGroupFrontier {
258            parquet_metadata,
259            row_groups,
260            selection,
261            budget,
262            has_predicates: _,
263        } = frontier;
264        RemainingRowGroupsParts {
265            schema,
266            metadata: parquet_metadata,
267            row_groups: Vec::from(row_groups),
268            selection,
269            offset: budget.offset(),
270            limit: budget.limit(),
271            reader_builder: row_group_reader_builder.into_parts(),
272        }
273    }
274
275    /// Push new data buffers that can be used to satisfy pending requests
276    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
277        self.row_group_reader_builder.push_data(ranges, buffers);
278    }
279
280    /// Return the total number of bytes buffered so far
281    pub fn buffered_bytes(&self) -> u64 {
282        self.row_group_reader_builder.buffered_bytes()
283    }
284
285    /// Clear any staged ranges currently buffered for future decode work
286    pub fn clear_all_ranges(&mut self) {
287        self.row_group_reader_builder.clear_all_ranges();
288    }
289
290    /// True iff the inner row-group reader is between row groups (state
291    /// `Finished`). Forward to [`RowGroupReaderBuilder::is_finished`].
292    pub fn is_at_row_group_boundary(&self) -> bool {
293        self.row_group_reader_builder.is_finished()
294    }
295
296    /// Number of row groups remaining (not including the one currently
297    /// being decoded).
298    pub fn row_groups_remaining(&self) -> usize {
299        self.frontier.row_groups.len()
300    }
301
302    /// returns [`ParquetRecordBatchReader`] suitable for reading the next
303    /// group of rows from the Parquet data, or the list of data ranges still
304    /// needed to proceed
305    pub fn try_next_reader(
306        &mut self,
307    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
308        loop {
309            if !self.row_group_reader_builder.has_active_row_group() {
310                // We are done with the previous row group, seek to the next one
311                // from the frontier, if any.
312
313                match self.frontier.next_readable_row_group()? {
314                    Some(NextRowGroup {
315                        row_group_idx,
316                        row_count,
317                        selection,
318                        budget,
319                    }) => {
320                        self.row_group_reader_builder.next_row_group(
321                            row_group_idx,
322                            row_count,
323                            selection,
324                            budget,
325                        )?;
326                    }
327                    None => return Ok(DecodeResult::Finished),
328                }
329            }
330
331            match self.row_group_reader_builder.try_build()? {
332                RowGroupBuildResult::Finished { remaining_budget } => {
333                    self.frontier
334                        .update_budget_after_row_group(remaining_budget);
335                    // reader is done, proceed to the next row group
336                }
337                RowGroupBuildResult::NeedsData(ranges) => {
338                    // need more data to proceed
339                    return Ok(DecodeResult::NeedsData(ranges));
340                }
341                RowGroupBuildResult::Data {
342                    batch_reader,
343                    remaining_budget,
344                } => {
345                    self.frontier
346                        .update_budget_after_row_group(remaining_budget);
347                    // ready to read the row group
348                    return Ok(DecodeResult::Data(batch_reader));
349                }
350            }
351        }
352    }
353}