parquet/arrow/push_decoder/remaining.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::{
21 RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, RowGroupReaderBuilderParts,
22};
23use crate::errors::ParquetError;
24use crate::file::metadata::ParquetMetaData;
25use arrow_schema::SchemaRef;
26use bytes::Bytes;
27use std::collections::VecDeque;
28use std::ops::Range;
29use std::sync::Arc;
30
31/// Plan for the next queued row group after row-selection slicing.
32#[derive(Debug)]
33enum QueuedRowGroupDecision {
34 /// Hand this row group to the builder.
35 Read(NextRowGroup),
36 /// Skip this row group, and keep scanning with the updated budget.
37 Skip { remaining_budget: RowBudget },
38}
39
40/// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`].
41#[derive(Debug)]
42struct NextRowGroup {
43 row_group_idx: usize,
44 row_count: usize,
45 /// This row group's slice of the global selection, or `None` when all rows
46 /// are selected.
47 selection: Option<RowSelection>,
48 /// Budget snapshot to apply while decoding this row group.
49 budget: RowBudget,
50}
51
52#[derive(Debug, Clone)]
53struct RowGroupFrontier {
54 /// Metadata used to resolve row counts for queued row groups.
55 parquet_metadata: Arc<ParquetMetaData>,
56 /// Row group indices not yet handed to the builder.
57 row_groups: VecDeque<usize>,
58 /// Cross-row-group cursor for the optional global row selection.
59 selection: Option<RowSelection>,
60 /// Offset/limit budget before the next readable row group is planned.
61 budget: RowBudget,
62 /// If predicates are present, row groups with selected rows must be read so
63 /// the predicate can decide whether they are actually needed.
64 has_predicates: bool,
65}
66
67impl RowGroupFrontier {
68 fn new(
69 parquet_metadata: Arc<ParquetMetaData>,
70 row_groups: Vec<usize>,
71 selection: Option<RowSelection>,
72 budget: RowBudget,
73 has_predicates: bool,
74 ) -> Self {
75 Self {
76 parquet_metadata,
77 row_groups: VecDeque::from(row_groups),
78 selection,
79 budget,
80 has_predicates,
81 }
82 }
83
84 fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, ParquetError> {
85 self.parquet_metadata
86 .row_group(row_group_idx)
87 .num_rows()
88 .try_into()
89 .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))
90 }
91
92 fn update_budget_after_row_group(&mut self, budget: RowBudget) {
93 self.budget = budget;
94 }
95
96 /// Peek at the next row-group index [`Self::next_readable_row_group`]
97 /// would hand out, without mutating any state. Returns `None` if every
98 /// remaining row group would be skipped under the current
99 /// selection/budget, or if the queue is empty.
100 ///
101 /// Runs the real [`Self::next_readable_row_group`] advance logic on a
102 /// throwaway clone of the frontier, so peek can never drift from the
103 /// read path. The clone copies the queued row-group indices and optional
104 /// row-selection (a `Vec<RowSelector>`); see
105 /// [`RemainingRowGroups::peek_next_row_group`].
106 fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
107 Ok(self
108 .clone()
109 .next_readable_row_group()?
110 .map(|next_row_group| next_row_group.row_group_idx))
111 }
112
113 fn clear_remaining(&mut self) {
114 self.selection = None;
115 self.row_groups.clear();
116 }
117
118 /// Plan whether a selected row group should be read or skipped.
119 ///
120 /// Selection-only skips are handled before this method is called. This
121 /// method applies the remaining offset/limit budget and predicate
122 /// conservatism.
123 fn plan_selected_row_group(
124 &self,
125 next_row_group: NextRowGroup,
126 selected_rows: usize,
127 ) -> QueuedRowGroupDecision {
128 if self.has_predicates {
129 return QueuedRowGroupDecision::Read(next_row_group);
130 }
131
132 let rows_after_budget = self.budget.rows_after(selected_rows);
133 if rows_after_budget != 0 {
134 return QueuedRowGroupDecision::Read(next_row_group);
135 }
136
137 QueuedRowGroupDecision::Skip {
138 remaining_budget: self.budget.advance(selected_rows, rows_after_budget),
139 }
140 }
141
142 /// Advance queued row groups until one should be handed to the builder.
143 fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, ParquetError> {
144 loop {
145 let Some(&row_group_idx) = self.row_groups.front() else {
146 return Ok(None);
147 };
148 if self.budget.is_exhausted()
149 || self
150 .selection
151 .as_ref()
152 .is_some_and(|selection| selection.row_count() == 0)
153 {
154 self.clear_remaining();
155 return Ok(None);
156 }
157
158 let row_count = self.row_group_num_rows(row_group_idx)?;
159 let (selection, selected_rows) = match self.selection.as_mut() {
160 Some(selection) => {
161 let selection = selection.split_off(row_count);
162 let selected_rows = selection.row_count();
163 if selected_rows == 0 {
164 self.row_groups.pop_front();
165 continue;
166 }
167
168 let selection = if selected_rows == row_count {
169 None
170 } else {
171 Some(selection)
172 };
173 (selection, selected_rows)
174 }
175 None => (None, row_count),
176 };
177
178 let next_row_group = NextRowGroup {
179 row_group_idx,
180 row_count,
181 selection,
182 budget: self.budget,
183 };
184
185 match self.plan_selected_row_group(next_row_group, selected_rows) {
186 QueuedRowGroupDecision::Read(next_row_group) => {
187 self.row_groups.pop_front();
188 return Ok(Some(next_row_group));
189 }
190 QueuedRowGroupDecision::Skip { remaining_budget } => {
191 self.row_groups.pop_front();
192 self.budget = remaining_budget;
193 }
194 }
195 }
196 }
197}
198
199/// State machine that tracks the remaining high level chunks (row groups) of
200/// Parquet data left to read.
201///
202/// [`RowGroupFrontier`] owns cross-row-group scan state and selects the next
203/// work item. [`RowGroupReaderBuilder`] owns decoding for the active row group.
204#[derive(Debug)]
205pub(crate) struct RemainingRowGroups {
206 /// The arrow schema of the decoded output. Carried only so
207 /// [`Self::into_parts`] can hand it to a rebuilt builder; unused while
208 /// decoding.
209 schema: SchemaRef,
210
211 /// Cross-row-group scan state for queued work.
212 frontier: RowGroupFrontier,
213
214 /// State for building the reader for the current row group
215 row_group_reader_builder: RowGroupReaderBuilder,
216}
217
218/// The state recovered from a [`RemainingRowGroups`] by
219/// [`RemainingRowGroups::into_parts`], describing the row groups *not* yet
220/// decoded so a builder reconstructed from it resumes where the decoder left off.
221#[derive(Debug)]
222pub(crate) struct RemainingRowGroupsParts {
223 /// The arrow schema of the decoded output.
224 pub schema: SchemaRef,
225 /// The Parquet file metadata.
226 pub metadata: Arc<ParquetMetaData>,
227 /// Row groups not yet handed to the reader builder.
228 pub row_groups: Vec<usize>,
229 /// The not-yet-consumed slice of the global row selection.
230 pub selection: Option<RowSelection>,
231 /// Offset still to be skipped before the next readable row group.
232 pub offset: Option<usize>,
233 /// Output rows still permitted across the remaining row groups.
234 pub limit: Option<usize>,
235 /// Builder-configurable parts of the inner row-group reader builder.
236 pub reader_builder: RowGroupReaderBuilderParts,
237}
238
239impl RemainingRowGroups {
240 pub fn new(
241 schema: SchemaRef,
242 parquet_metadata: Arc<ParquetMetaData>,
243 row_groups: Vec<usize>,
244 selection: Option<RowSelection>,
245 budget: RowBudget,
246 has_predicates: bool,
247 row_group_reader_builder: RowGroupReaderBuilder,
248 ) -> Self {
249 Self {
250 schema,
251 frontier: RowGroupFrontier::new(
252 parquet_metadata,
253 row_groups,
254 selection,
255 budget,
256 has_predicates,
257 ),
258 row_group_reader_builder,
259 }
260 }
261
262 /// Decompose into [`RemainingRowGroupsParts`].
263 ///
264 /// Must be called at a row-group boundary (see
265 /// [`Self::is_at_row_group_boundary`]). The inner reader builder's runtime
266 /// decode state is discarded; its buffered bytes are carried through.
267 pub(crate) fn into_parts(self) -> RemainingRowGroupsParts {
268 let Self {
269 schema,
270 frontier,
271 row_group_reader_builder,
272 } = self;
273 // `has_predicates` is recomputed by `build()` from the filter.
274 let RowGroupFrontier {
275 parquet_metadata,
276 row_groups,
277 selection,
278 budget,
279 has_predicates: _,
280 } = frontier;
281 RemainingRowGroupsParts {
282 schema,
283 metadata: parquet_metadata,
284 row_groups: Vec::from(row_groups),
285 selection,
286 offset: budget.offset(),
287 limit: budget.limit(),
288 reader_builder: row_group_reader_builder.into_parts(),
289 }
290 }
291
292 /// Push new data buffers that can be used to satisfy pending requests
293 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
294 self.row_group_reader_builder.push_data(ranges, buffers);
295 }
296
297 /// Return the total number of bytes buffered so far
298 pub fn buffered_bytes(&self) -> u64 {
299 self.row_group_reader_builder.buffered_bytes()
300 }
301
302 /// Clear any staged ranges currently buffered for future decode work
303 pub fn clear_all_ranges(&mut self) {
304 self.row_group_reader_builder.clear_all_ranges();
305 }
306
307 /// True iff the inner row-group reader is between row groups (state
308 /// `Finished`). Forward to [`RowGroupReaderBuilder::is_finished`].
309 pub fn is_at_row_group_boundary(&self) -> bool {
310 self.row_group_reader_builder.is_finished()
311 }
312
313 /// Number of row groups remaining (not including the one currently
314 /// being decoded).
315 pub fn row_groups_remaining(&self) -> usize {
316 self.frontier.row_groups.len()
317 }
318
319 /// Peek at the file-level row-group index that the next call to
320 /// [`Self::try_next_reader`] will produce a reader for, after
321 /// simulating the same skip logic [`Self::try_next_reader`] applies
322 /// internally (row-selection emptiness + offset/limit budget). Does
323 /// not mutate state.
324 ///
325 /// Returns `None` when the active row group is still being decoded,
326 /// when no row groups remain, or when every remaining row group
327 /// would be skipped under the current selection/budget.
328 ///
329 /// Cost: one clone of the queued row-group indices and optional
330 /// row-selection per call (the frontier is cloned so the real advance
331 /// logic can run non-destructively). For callers that peek once per
332 /// row-group boundary this is O(remaining row groups + selectors) per
333 /// boundary.
334 pub fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
335 if self.row_group_reader_builder.has_active_row_group() {
336 return Ok(None);
337 }
338 self.frontier.peek_next_row_group()
339 }
340
341 /// returns [`ParquetRecordBatchReader`] suitable for reading the next
342 /// group of rows from the Parquet data, or the list of data ranges still
343 /// needed to proceed
344 pub fn try_next_reader(
345 &mut self,
346 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
347 loop {
348 if !self.row_group_reader_builder.has_active_row_group() {
349 // We are done with the previous row group, seek to the next one
350 // from the frontier, if any.
351
352 match self.frontier.next_readable_row_group()? {
353 Some(NextRowGroup {
354 row_group_idx,
355 row_count,
356 selection,
357 budget,
358 }) => {
359 self.row_group_reader_builder.next_row_group(
360 row_group_idx,
361 row_count,
362 selection,
363 budget,
364 )?;
365 }
366 None => return Ok(DecodeResult::Finished),
367 }
368 }
369
370 match self.row_group_reader_builder.try_build()? {
371 RowGroupBuildResult::Finished { remaining_budget } => {
372 self.frontier
373 .update_budget_after_row_group(remaining_budget);
374 // reader is done, proceed to the next row group
375 }
376 RowGroupBuildResult::NeedsData(ranges) => {
377 // need more data to proceed
378 return Ok(DecodeResult::NeedsData(ranges));
379 }
380 RowGroupBuildResult::Data {
381 batch_reader,
382 remaining_budget,
383 } => {
384 self.frontier
385 .update_budget_after_row_group(remaining_budget);
386 // ready to read the row group
387 return Ok(DecodeResult::Data(batch_reader));
388 }
389 }
390 }
391 }
392}