parquet/arrow/push_decoder/
remaining.rs1use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::{
21 RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
22};
23use crate::errors::ParquetError;
24use crate::file::metadata::ParquetMetaData;
25use bytes::Bytes;
26use std::collections::VecDeque;
27use std::ops::Range;
28use std::sync::Arc;
29
30#[derive(Debug)]
32enum QueuedRowGroupDecision {
33 Read(NextRowGroup),
35 Skip { remaining_budget: RowBudget },
37}
38
39#[derive(Debug)]
41struct NextRowGroup {
42 row_group_idx: usize,
43 row_count: usize,
44 selection: Option<RowSelection>,
47 budget: RowBudget,
49}
50
51#[derive(Debug)]
52struct RowGroupFrontier {
53 parquet_metadata: Arc<ParquetMetaData>,
55 row_groups: VecDeque<usize>,
57 selection: Option<RowSelection>,
59 budget: RowBudget,
61 has_predicates: bool,
64}
65
66impl RowGroupFrontier {
67 fn new(
68 parquet_metadata: Arc<ParquetMetaData>,
69 row_groups: Vec<usize>,
70 selection: Option<RowSelection>,
71 budget: RowBudget,
72 has_predicates: bool,
73 ) -> Self {
74 Self {
75 parquet_metadata,
76 row_groups: VecDeque::from(row_groups),
77 selection,
78 budget,
79 has_predicates,
80 }
81 }
82
83 fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, ParquetError> {
84 self.parquet_metadata
85 .row_group(row_group_idx)
86 .num_rows()
87 .try_into()
88 .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))
89 }
90
91 fn update_budget_after_row_group(&mut self, budget: RowBudget) {
92 self.budget = budget;
93 }
94
95 fn clear_remaining(&mut self) {
96 self.selection = None;
97 self.row_groups.clear();
98 }
99
100 fn plan_selected_row_group(
106 &self,
107 next_row_group: NextRowGroup,
108 selected_rows: usize,
109 ) -> QueuedRowGroupDecision {
110 if self.has_predicates {
111 return QueuedRowGroupDecision::Read(next_row_group);
112 }
113
114 let rows_after_budget = self.budget.rows_after(selected_rows);
115 if rows_after_budget != 0 {
116 return QueuedRowGroupDecision::Read(next_row_group);
117 }
118
119 QueuedRowGroupDecision::Skip {
120 remaining_budget: self.budget.advance(selected_rows, rows_after_budget),
121 }
122 }
123
124 fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, ParquetError> {
126 loop {
127 let Some(&row_group_idx) = self.row_groups.front() else {
128 return Ok(None);
129 };
130 if self.budget.is_exhausted()
131 || self
132 .selection
133 .as_ref()
134 .is_some_and(|selection| selection.row_count() == 0)
135 {
136 self.clear_remaining();
137 return Ok(None);
138 }
139
140 let row_count = self.row_group_num_rows(row_group_idx)?;
141 let (selection, selected_rows) = match self.selection.as_mut() {
142 Some(selection) => {
143 let selection = selection.split_off(row_count);
144 let selected_rows = selection.row_count();
145 if selected_rows == 0 {
146 self.row_groups.pop_front();
147 continue;
148 }
149
150 let selection = if selected_rows == row_count {
151 None
152 } else {
153 Some(selection)
154 };
155 (selection, selected_rows)
156 }
157 None => (None, row_count),
158 };
159
160 let next_row_group = NextRowGroup {
161 row_group_idx,
162 row_count,
163 selection,
164 budget: self.budget,
165 };
166
167 match self.plan_selected_row_group(next_row_group, selected_rows) {
168 QueuedRowGroupDecision::Read(next_row_group) => {
169 self.row_groups.pop_front();
170 return Ok(Some(next_row_group));
171 }
172 QueuedRowGroupDecision::Skip { remaining_budget } => {
173 self.row_groups.pop_front();
174 self.budget = remaining_budget;
175 }
176 }
177 }
178 }
179}
180
181#[derive(Debug)]
187pub(crate) struct RemainingRowGroups {
188 frontier: RowGroupFrontier,
190
191 row_group_reader_builder: RowGroupReaderBuilder,
193}
194
195impl RemainingRowGroups {
196 pub fn new(
197 parquet_metadata: Arc<ParquetMetaData>,
198 row_groups: Vec<usize>,
199 selection: Option<RowSelection>,
200 budget: RowBudget,
201 has_predicates: bool,
202 row_group_reader_builder: RowGroupReaderBuilder,
203 ) -> Self {
204 Self {
205 frontier: RowGroupFrontier::new(
206 parquet_metadata,
207 row_groups,
208 selection,
209 budget,
210 has_predicates,
211 ),
212 row_group_reader_builder,
213 }
214 }
215
216 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
218 self.row_group_reader_builder.push_data(ranges, buffers);
219 }
220
221 pub fn buffered_bytes(&self) -> u64 {
223 self.row_group_reader_builder.buffered_bytes()
224 }
225
226 pub fn clear_all_ranges(&mut self) {
228 self.row_group_reader_builder.clear_all_ranges();
229 }
230
231 pub fn try_next_reader(
235 &mut self,
236 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
237 loop {
238 if !self.row_group_reader_builder.has_active_row_group() {
239 match self.frontier.next_readable_row_group()? {
243 Some(NextRowGroup {
244 row_group_idx,
245 row_count,
246 selection,
247 budget,
248 }) => {
249 self.row_group_reader_builder.next_row_group(
250 row_group_idx,
251 row_count,
252 selection,
253 budget,
254 )?;
255 }
256 None => return Ok(DecodeResult::Finished),
257 }
258 }
259
260 match self.row_group_reader_builder.try_build()? {
261 RowGroupBuildResult::Finished { remaining_budget } => {
262 self.frontier
263 .update_budget_after_row_group(remaining_budget);
264 }
266 RowGroupBuildResult::NeedsData(ranges) => {
267 return Ok(DecodeResult::NeedsData(ranges));
269 }
270 RowGroupBuildResult::Data {
271 batch_reader,
272 remaining_budget,
273 } => {
274 self.frontier
275 .update_budget_after_row_group(remaining_budget);
276 return Ok(DecodeResult::Data(batch_reader));
278 }
279 }
280 }
281 }
282}