parquet/arrow/push_decoder/remaining.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::{
21 RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, RowGroupReaderBuilderParts,
22};
23use crate::errors::ParquetError;
24use crate::file::metadata::ParquetMetaData;
25use arrow_schema::SchemaRef;
26use bytes::Bytes;
27use std::collections::VecDeque;
28use std::ops::Range;
29use std::sync::Arc;
30
31/// Plan for the next queued row group after row-selection slicing.
32#[derive(Debug)]
33enum QueuedRowGroupDecision {
34 /// Hand this row group to the builder.
35 Read(NextRowGroup),
36 /// Skip this row group, and keep scanning with the updated budget.
37 Skip { remaining_budget: RowBudget },
38}
39
40/// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`].
41#[derive(Debug)]
42struct NextRowGroup {
43 row_group_idx: usize,
44 row_count: usize,
45 /// This row group's slice of the global selection, or `None` when all rows
46 /// are selected.
47 selection: Option<RowSelection>,
48 /// Budget snapshot to apply while decoding this row group.
49 budget: RowBudget,
50}
51
52#[derive(Debug)]
53struct RowGroupFrontier {
54 /// Metadata used to resolve row counts for queued row groups.
55 parquet_metadata: Arc<ParquetMetaData>,
56 /// Row group indices not yet handed to the builder.
57 row_groups: VecDeque<usize>,
58 /// Cross-row-group cursor for the optional global row selection.
59 selection: Option<RowSelection>,
60 /// Offset/limit budget before the next readable row group is planned.
61 budget: RowBudget,
62 /// If predicates are present, row groups with selected rows must be read so
63 /// the predicate can decide whether they are actually needed.
64 has_predicates: bool,
65}
66
67impl RowGroupFrontier {
68 fn new(
69 parquet_metadata: Arc<ParquetMetaData>,
70 row_groups: Vec<usize>,
71 selection: Option<RowSelection>,
72 budget: RowBudget,
73 has_predicates: bool,
74 ) -> Self {
75 Self {
76 parquet_metadata,
77 row_groups: VecDeque::from(row_groups),
78 selection,
79 budget,
80 has_predicates,
81 }
82 }
83
84 fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, ParquetError> {
85 self.parquet_metadata
86 .row_group(row_group_idx)
87 .num_rows()
88 .try_into()
89 .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))
90 }
91
92 fn update_budget_after_row_group(&mut self, budget: RowBudget) {
93 self.budget = budget;
94 }
95
96 fn clear_remaining(&mut self) {
97 self.selection = None;
98 self.row_groups.clear();
99 }
100
101 /// Plan whether a selected row group should be read or skipped.
102 ///
103 /// Selection-only skips are handled before this method is called. This
104 /// method applies the remaining offset/limit budget and predicate
105 /// conservatism.
106 fn plan_selected_row_group(
107 &self,
108 next_row_group: NextRowGroup,
109 selected_rows: usize,
110 ) -> QueuedRowGroupDecision {
111 if self.has_predicates {
112 return QueuedRowGroupDecision::Read(next_row_group);
113 }
114
115 let rows_after_budget = self.budget.rows_after(selected_rows);
116 if rows_after_budget != 0 {
117 return QueuedRowGroupDecision::Read(next_row_group);
118 }
119
120 QueuedRowGroupDecision::Skip {
121 remaining_budget: self.budget.advance(selected_rows, rows_after_budget),
122 }
123 }
124
125 /// Advance queued row groups until one should be handed to the builder.
126 fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, ParquetError> {
127 loop {
128 let Some(&row_group_idx) = self.row_groups.front() else {
129 return Ok(None);
130 };
131 if self.budget.is_exhausted()
132 || self
133 .selection
134 .as_ref()
135 .is_some_and(|selection| selection.row_count() == 0)
136 {
137 self.clear_remaining();
138 return Ok(None);
139 }
140
141 let row_count = self.row_group_num_rows(row_group_idx)?;
142 let (selection, selected_rows) = match self.selection.as_mut() {
143 Some(selection) => {
144 let selection = selection.split_off(row_count);
145 let selected_rows = selection.row_count();
146 if selected_rows == 0 {
147 self.row_groups.pop_front();
148 continue;
149 }
150
151 let selection = if selected_rows == row_count {
152 None
153 } else {
154 Some(selection)
155 };
156 (selection, selected_rows)
157 }
158 None => (None, row_count),
159 };
160
161 let next_row_group = NextRowGroup {
162 row_group_idx,
163 row_count,
164 selection,
165 budget: self.budget,
166 };
167
168 match self.plan_selected_row_group(next_row_group, selected_rows) {
169 QueuedRowGroupDecision::Read(next_row_group) => {
170 self.row_groups.pop_front();
171 return Ok(Some(next_row_group));
172 }
173 QueuedRowGroupDecision::Skip { remaining_budget } => {
174 self.row_groups.pop_front();
175 self.budget = remaining_budget;
176 }
177 }
178 }
179 }
180}
181
182/// State machine that tracks the remaining high level chunks (row groups) of
183/// Parquet data left to read.
184///
185/// [`RowGroupFrontier`] owns cross-row-group scan state and selects the next
186/// work item. [`RowGroupReaderBuilder`] owns decoding for the active row group.
187#[derive(Debug)]
188pub(crate) struct RemainingRowGroups {
189 /// The arrow schema of the decoded output. Carried only so
190 /// [`Self::into_parts`] can hand it to a rebuilt builder; unused while
191 /// decoding.
192 schema: SchemaRef,
193
194 /// Cross-row-group scan state for queued work.
195 frontier: RowGroupFrontier,
196
197 /// State for building the reader for the current row group
198 row_group_reader_builder: RowGroupReaderBuilder,
199}
200
201/// The state recovered from a [`RemainingRowGroups`] by
202/// [`RemainingRowGroups::into_parts`], describing the row groups *not* yet
203/// decoded so a builder reconstructed from it resumes where the decoder left off.
204#[derive(Debug)]
205pub(crate) struct RemainingRowGroupsParts {
206 /// The arrow schema of the decoded output.
207 pub schema: SchemaRef,
208 /// The Parquet file metadata.
209 pub metadata: Arc<ParquetMetaData>,
210 /// Row groups not yet handed to the reader builder.
211 pub row_groups: Vec<usize>,
212 /// The not-yet-consumed slice of the global row selection.
213 pub selection: Option<RowSelection>,
214 /// Offset still to be skipped before the next readable row group.
215 pub offset: Option<usize>,
216 /// Output rows still permitted across the remaining row groups.
217 pub limit: Option<usize>,
218 /// Builder-configurable parts of the inner row-group reader builder.
219 pub reader_builder: RowGroupReaderBuilderParts,
220}
221
222impl RemainingRowGroups {
223 pub fn new(
224 schema: SchemaRef,
225 parquet_metadata: Arc<ParquetMetaData>,
226 row_groups: Vec<usize>,
227 selection: Option<RowSelection>,
228 budget: RowBudget,
229 has_predicates: bool,
230 row_group_reader_builder: RowGroupReaderBuilder,
231 ) -> Self {
232 Self {
233 schema,
234 frontier: RowGroupFrontier::new(
235 parquet_metadata,
236 row_groups,
237 selection,
238 budget,
239 has_predicates,
240 ),
241 row_group_reader_builder,
242 }
243 }
244
245 /// Decompose into [`RemainingRowGroupsParts`].
246 ///
247 /// Must be called at a row-group boundary (see
248 /// [`Self::is_at_row_group_boundary`]). The inner reader builder's runtime
249 /// decode state is discarded; its buffered bytes are carried through.
250 pub(crate) fn into_parts(self) -> RemainingRowGroupsParts {
251 let Self {
252 schema,
253 frontier,
254 row_group_reader_builder,
255 } = self;
256 // `has_predicates` is recomputed by `build()` from the filter.
257 let RowGroupFrontier {
258 parquet_metadata,
259 row_groups,
260 selection,
261 budget,
262 has_predicates: _,
263 } = frontier;
264 RemainingRowGroupsParts {
265 schema,
266 metadata: parquet_metadata,
267 row_groups: Vec::from(row_groups),
268 selection,
269 offset: budget.offset(),
270 limit: budget.limit(),
271 reader_builder: row_group_reader_builder.into_parts(),
272 }
273 }
274
275 /// Push new data buffers that can be used to satisfy pending requests
276 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
277 self.row_group_reader_builder.push_data(ranges, buffers);
278 }
279
280 /// Return the total number of bytes buffered so far
281 pub fn buffered_bytes(&self) -> u64 {
282 self.row_group_reader_builder.buffered_bytes()
283 }
284
285 /// Clear any staged ranges currently buffered for future decode work
286 pub fn clear_all_ranges(&mut self) {
287 self.row_group_reader_builder.clear_all_ranges();
288 }
289
290 /// True iff the inner row-group reader is between row groups (state
291 /// `Finished`). Forward to [`RowGroupReaderBuilder::is_finished`].
292 pub fn is_at_row_group_boundary(&self) -> bool {
293 self.row_group_reader_builder.is_finished()
294 }
295
296 /// Number of row groups remaining (not including the one currently
297 /// being decoded).
298 pub fn row_groups_remaining(&self) -> usize {
299 self.frontier.row_groups.len()
300 }
301
302 /// returns [`ParquetRecordBatchReader`] suitable for reading the next
303 /// group of rows from the Parquet data, or the list of data ranges still
304 /// needed to proceed
305 pub fn try_next_reader(
306 &mut self,
307 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
308 loop {
309 if !self.row_group_reader_builder.has_active_row_group() {
310 // We are done with the previous row group, seek to the next one
311 // from the frontier, if any.
312
313 match self.frontier.next_readable_row_group()? {
314 Some(NextRowGroup {
315 row_group_idx,
316 row_count,
317 selection,
318 budget,
319 }) => {
320 self.row_group_reader_builder.next_row_group(
321 row_group_idx,
322 row_count,
323 selection,
324 budget,
325 )?;
326 }
327 None => return Ok(DecodeResult::Finished),
328 }
329 }
330
331 match self.row_group_reader_builder.try_build()? {
332 RowGroupBuildResult::Finished { remaining_budget } => {
333 self.frontier
334 .update_budget_after_row_group(remaining_budget);
335 // reader is done, proceed to the next row group
336 }
337 RowGroupBuildResult::NeedsData(ranges) => {
338 // need more data to proceed
339 return Ok(DecodeResult::NeedsData(ranges));
340 }
341 RowGroupBuildResult::Data {
342 batch_reader,
343 remaining_budget,
344 } => {
345 self.frontier
346 .update_budget_after_row_group(remaining_budget);
347 // ready to read the row group
348 return Ok(DecodeResult::Data(batch_reader));
349 }
350 }
351 }
352 }
353}