Skip to main content

parquet/arrow/push_decoder/
remaining.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::{
21    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, RowGroupReaderBuilderParts,
22};
23use crate::errors::ParquetError;
24use crate::file::metadata::ParquetMetaData;
25use arrow_schema::SchemaRef;
26use bytes::Bytes;
27use std::collections::VecDeque;
28use std::ops::Range;
29use std::sync::Arc;
30
31/// Plan for the next queued row group after row-selection slicing.
32#[derive(Debug)]
33enum QueuedRowGroupDecision {
34    /// Hand this row group to the builder.
35    Read(NextRowGroup),
36    /// Skip this row group, and keep scanning with the updated budget.
37    Skip { remaining_budget: RowBudget },
38}
39
40/// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`].
41#[derive(Debug)]
42struct NextRowGroup {
43    row_group_idx: usize,
44    row_count: usize,
45    /// This row group's slice of the global selection, or `None` when all rows
46    /// are selected.
47    selection: Option<RowSelection>,
48    /// Budget snapshot to apply while decoding this row group.
49    budget: RowBudget,
50}
51
52#[derive(Debug, Clone)]
53struct RowGroupFrontier {
54    /// Metadata used to resolve row counts for queued row groups.
55    parquet_metadata: Arc<ParquetMetaData>,
56    /// Row group indices not yet handed to the builder.
57    row_groups: VecDeque<usize>,
58    /// Cross-row-group cursor for the optional global row selection.
59    selection: Option<RowSelection>,
60    /// Offset/limit budget before the next readable row group is planned.
61    budget: RowBudget,
62    /// If predicates are present, row groups with selected rows must be read so
63    /// the predicate can decide whether they are actually needed.
64    has_predicates: bool,
65}
66
67impl RowGroupFrontier {
68    fn new(
69        parquet_metadata: Arc<ParquetMetaData>,
70        row_groups: Vec<usize>,
71        selection: Option<RowSelection>,
72        budget: RowBudget,
73        has_predicates: bool,
74    ) -> Self {
75        Self {
76            parquet_metadata,
77            row_groups: VecDeque::from(row_groups),
78            selection,
79            budget,
80            has_predicates,
81        }
82    }
83
84    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, ParquetError> {
85        self.parquet_metadata
86            .row_group(row_group_idx)
87            .num_rows()
88            .try_into()
89            .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))
90    }
91
92    fn update_budget_after_row_group(&mut self, budget: RowBudget) {
93        self.budget = budget;
94    }
95
96    /// Peek at the next row-group index [`Self::next_readable_row_group`]
97    /// would hand out, without mutating any state. Returns `None` if every
98    /// remaining row group would be skipped under the current
99    /// selection/budget, or if the queue is empty.
100    ///
101    /// Runs the real [`Self::next_readable_row_group`] advance logic on a
102    /// throwaway clone of the frontier, so peek can never drift from the
103    /// read path. The clone copies the queued row-group indices and optional
104    /// row-selection (a `Vec<RowSelector>`); see
105    /// [`RemainingRowGroups::peek_next_row_group`].
106    fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
107        Ok(self
108            .clone()
109            .next_readable_row_group()?
110            .map(|next_row_group| next_row_group.row_group_idx))
111    }
112
113    fn clear_remaining(&mut self) {
114        self.selection = None;
115        self.row_groups.clear();
116    }
117
118    /// Plan whether a selected row group should be read or skipped.
119    ///
120    /// Selection-only skips are handled before this method is called. This
121    /// method applies the remaining offset/limit budget and predicate
122    /// conservatism.
123    fn plan_selected_row_group(
124        &self,
125        next_row_group: NextRowGroup,
126        selected_rows: usize,
127    ) -> QueuedRowGroupDecision {
128        if self.has_predicates {
129            return QueuedRowGroupDecision::Read(next_row_group);
130        }
131
132        let rows_after_budget = self.budget.rows_after(selected_rows);
133        if rows_after_budget != 0 {
134            return QueuedRowGroupDecision::Read(next_row_group);
135        }
136
137        QueuedRowGroupDecision::Skip {
138            remaining_budget: self.budget.advance(selected_rows, rows_after_budget),
139        }
140    }
141
142    /// Advance queued row groups until one should be handed to the builder.
143    fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, ParquetError> {
144        loop {
145            let Some(&row_group_idx) = self.row_groups.front() else {
146                return Ok(None);
147            };
148            if self.budget.is_exhausted()
149                || self
150                    .selection
151                    .as_ref()
152                    .is_some_and(|selection| selection.row_count() == 0)
153            {
154                self.clear_remaining();
155                return Ok(None);
156            }
157
158            let row_count = self.row_group_num_rows(row_group_idx)?;
159            let (selection, selected_rows) = match self.selection.as_mut() {
160                Some(selection) => {
161                    let selection = selection.split_off(row_count);
162                    let selected_rows = selection.row_count();
163                    if selected_rows == 0 {
164                        self.row_groups.pop_front();
165                        continue;
166                    }
167
168                    let selection = if selected_rows == row_count {
169                        None
170                    } else {
171                        Some(selection)
172                    };
173                    (selection, selected_rows)
174                }
175                None => (None, row_count),
176            };
177
178            let next_row_group = NextRowGroup {
179                row_group_idx,
180                row_count,
181                selection,
182                budget: self.budget,
183            };
184
185            match self.plan_selected_row_group(next_row_group, selected_rows) {
186                QueuedRowGroupDecision::Read(next_row_group) => {
187                    self.row_groups.pop_front();
188                    return Ok(Some(next_row_group));
189                }
190                QueuedRowGroupDecision::Skip { remaining_budget } => {
191                    self.row_groups.pop_front();
192                    self.budget = remaining_budget;
193                }
194            }
195        }
196    }
197}
198
199/// State machine that tracks the remaining high level chunks (row groups) of
200/// Parquet data left to read.
201///
202/// [`RowGroupFrontier`] owns cross-row-group scan state and selects the next
203/// work item. [`RowGroupReaderBuilder`] owns decoding for the active row group.
204#[derive(Debug)]
205pub(crate) struct RemainingRowGroups {
206    /// The arrow schema of the decoded output. Carried only so
207    /// [`Self::into_parts`] can hand it to a rebuilt builder; unused while
208    /// decoding.
209    schema: SchemaRef,
210
211    /// Cross-row-group scan state for queued work.
212    frontier: RowGroupFrontier,
213
214    /// State for building the reader for the current row group
215    row_group_reader_builder: RowGroupReaderBuilder,
216}
217
218/// The state recovered from a [`RemainingRowGroups`] by
219/// [`RemainingRowGroups::into_parts`], describing the row groups *not* yet
220/// decoded so a builder reconstructed from it resumes where the decoder left off.
221#[derive(Debug)]
222pub(crate) struct RemainingRowGroupsParts {
223    /// The arrow schema of the decoded output.
224    pub schema: SchemaRef,
225    /// The Parquet file metadata.
226    pub metadata: Arc<ParquetMetaData>,
227    /// Row groups not yet handed to the reader builder.
228    pub row_groups: Vec<usize>,
229    /// The not-yet-consumed slice of the global row selection.
230    pub selection: Option<RowSelection>,
231    /// Offset still to be skipped before the next readable row group.
232    pub offset: Option<usize>,
233    /// Output rows still permitted across the remaining row groups.
234    pub limit: Option<usize>,
235    /// Builder-configurable parts of the inner row-group reader builder.
236    pub reader_builder: RowGroupReaderBuilderParts,
237}
238
239impl RemainingRowGroups {
240    pub fn new(
241        schema: SchemaRef,
242        parquet_metadata: Arc<ParquetMetaData>,
243        row_groups: Vec<usize>,
244        selection: Option<RowSelection>,
245        budget: RowBudget,
246        has_predicates: bool,
247        row_group_reader_builder: RowGroupReaderBuilder,
248    ) -> Self {
249        Self {
250            schema,
251            frontier: RowGroupFrontier::new(
252                parquet_metadata,
253                row_groups,
254                selection,
255                budget,
256                has_predicates,
257            ),
258            row_group_reader_builder,
259        }
260    }
261
262    /// Decompose into [`RemainingRowGroupsParts`].
263    ///
264    /// Must be called at a row-group boundary (see
265    /// [`Self::is_at_row_group_boundary`]). The inner reader builder's runtime
266    /// decode state is discarded; its buffered bytes are carried through.
267    pub(crate) fn into_parts(self) -> RemainingRowGroupsParts {
268        let Self {
269            schema,
270            frontier,
271            row_group_reader_builder,
272        } = self;
273        // `has_predicates` is recomputed by `build()` from the filter.
274        let RowGroupFrontier {
275            parquet_metadata,
276            row_groups,
277            selection,
278            budget,
279            has_predicates: _,
280        } = frontier;
281        RemainingRowGroupsParts {
282            schema,
283            metadata: parquet_metadata,
284            row_groups: Vec::from(row_groups),
285            selection,
286            offset: budget.offset(),
287            limit: budget.limit(),
288            reader_builder: row_group_reader_builder.into_parts(),
289        }
290    }
291
292    /// Push new data buffers that can be used to satisfy pending requests
293    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
294        self.row_group_reader_builder.push_data(ranges, buffers);
295    }
296
297    /// Return the total number of bytes buffered so far
298    pub fn buffered_bytes(&self) -> u64 {
299        self.row_group_reader_builder.buffered_bytes()
300    }
301
302    /// Clear any staged ranges currently buffered for future decode work
303    pub fn clear_all_ranges(&mut self) {
304        self.row_group_reader_builder.clear_all_ranges();
305    }
306
307    /// True iff the inner row-group reader is between row groups (state
308    /// `Finished`). Forward to [`RowGroupReaderBuilder::is_finished`].
309    pub fn is_at_row_group_boundary(&self) -> bool {
310        self.row_group_reader_builder.is_finished()
311    }
312
313    /// Number of row groups remaining (not including the one currently
314    /// being decoded).
315    pub fn row_groups_remaining(&self) -> usize {
316        self.frontier.row_groups.len()
317    }
318
319    /// Peek at the file-level row-group index that the next call to
320    /// [`Self::try_next_reader`] will produce a reader for, after
321    /// simulating the same skip logic [`Self::try_next_reader`] applies
322    /// internally (row-selection emptiness + offset/limit budget). Does
323    /// not mutate state.
324    ///
325    /// Returns `None` when the active row group is still being decoded,
326    /// when no row groups remain, or when every remaining row group
327    /// would be skipped under the current selection/budget.
328    ///
329    /// Cost: one clone of the queued row-group indices and optional
330    /// row-selection per call (the frontier is cloned so the real advance
331    /// logic can run non-destructively). For callers that peek once per
332    /// row-group boundary this is O(remaining row groups + selectors) per
333    /// boundary.
334    pub fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
335        if self.row_group_reader_builder.has_active_row_group() {
336            return Ok(None);
337        }
338        self.frontier.peek_next_row_group()
339    }
340
341    /// returns [`ParquetRecordBatchReader`] suitable for reading the next
342    /// group of rows from the Parquet data, or the list of data ranges still
343    /// needed to proceed
344    pub fn try_next_reader(
345        &mut self,
346    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
347        loop {
348            if !self.row_group_reader_builder.has_active_row_group() {
349                // We are done with the previous row group, seek to the next one
350                // from the frontier, if any.
351
352                match self.frontier.next_readable_row_group()? {
353                    Some(NextRowGroup {
354                        row_group_idx,
355                        row_count,
356                        selection,
357                        budget,
358                    }) => {
359                        self.row_group_reader_builder.next_row_group(
360                            row_group_idx,
361                            row_count,
362                            selection,
363                            budget,
364                        )?;
365                    }
366                    None => return Ok(DecodeResult::Finished),
367                }
368            }
369
370            match self.row_group_reader_builder.try_build()? {
371                RowGroupBuildResult::Finished { remaining_budget } => {
372                    self.frontier
373                        .update_budget_after_row_group(remaining_budget);
374                    // reader is done, proceed to the next row group
375                }
376                RowGroupBuildResult::NeedsData(ranges) => {
377                    // need more data to proceed
378                    return Ok(DecodeResult::NeedsData(ranges));
379                }
380                RowGroupBuildResult::Data {
381                    batch_reader,
382                    remaining_budget,
383                } => {
384                    self.frontier
385                        .update_budget_after_row_group(remaining_budget);
386                    // ready to read the row group
387                    return Ok(DecodeResult::Data(batch_reader));
388                }
389            }
390        }
391    }
392}