1mod data;
19mod filter;
20
21use crate::DecodeResult;
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
24use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
25use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
26use crate::arrow::arrow_reader::{
27 ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, Mutex};
43
44#[derive(Debug)]
46struct RowGroupInfo {
47 row_group_idx: usize,
48 row_count: usize,
49 plan_builder: ReadPlanBuilder,
50}
51
52#[derive(Debug)]
54enum RowGroupDecoderState {
55 Start {
56 row_group_info: RowGroupInfo,
57 },
58 Filters {
60 row_group_info: RowGroupInfo,
61 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
63 filter_info: FilterInfo,
64 },
65 WaitingOnFilterData {
67 row_group_info: RowGroupInfo,
68 filter_info: FilterInfo,
69 data_request: DataRequest,
70 },
71 StartData {
73 row_group_info: RowGroupInfo,
74 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
76 cache_info: Option<CacheInfo>,
78 },
79 WaitingOnData {
81 row_group_info: RowGroupInfo,
82 data_request: DataRequest,
83 cache_info: Option<CacheInfo>,
85 },
86 Finished,
88}
89
90#[derive(Debug)]
92struct NextState {
93 next_state: RowGroupDecoderState,
94 result: Option<DecodeResult<ParquetRecordBatchReader>>,
99}
100
101impl NextState {
102 fn again(next_state: RowGroupDecoderState) -> Self {
106 Self {
107 next_state,
108 result: None,
109 }
110 }
111
112 fn result(
114 next_state: RowGroupDecoderState,
115 result: DecodeResult<ParquetRecordBatchReader>,
116 ) -> Self {
117 Self {
118 next_state,
119 result: Some(result),
120 }
121 }
122}
123
124#[derive(Debug)]
130pub(crate) struct RowGroupReaderBuilder {
131 batch_size: usize,
133
134 projection: ProjectionMask,
136
137 metadata: Arc<ParquetMetaData>,
139
140 fields: Option<Arc<ParquetField>>,
142
143 filter: Option<RowFilter>,
145
146 limit: Option<usize>,
148
149 offset: Option<usize>,
151
152 max_predicate_cache_size: usize,
156
157 metrics: ArrowReaderMetrics,
159
160 row_selection_policy: RowSelectionPolicy,
162
163 state: Option<RowGroupDecoderState>,
168
169 buffers: PushBuffers,
171}
172
173impl RowGroupReaderBuilder {
174 #[expect(clippy::too_many_arguments)]
176 pub(crate) fn new(
177 batch_size: usize,
178 projection: ProjectionMask,
179 metadata: Arc<ParquetMetaData>,
180 fields: Option<Arc<ParquetField>>,
181 filter: Option<RowFilter>,
182 limit: Option<usize>,
183 offset: Option<usize>,
184 metrics: ArrowReaderMetrics,
185 max_predicate_cache_size: usize,
186 buffers: PushBuffers,
187 row_selection_policy: RowSelectionPolicy,
188 ) -> Self {
189 Self {
190 batch_size,
191 projection,
192 metadata,
193 fields,
194 filter,
195 limit,
196 offset,
197 metrics,
198 max_predicate_cache_size,
199 row_selection_policy,
200 state: Some(RowGroupDecoderState::Finished),
201 buffers,
202 }
203 }
204
205 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
207 self.buffers.push_ranges(ranges, buffers);
208 }
209
210 pub fn buffered_bytes(&self) -> u64 {
212 self.buffers.buffered_bytes()
213 }
214
215 fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
223 self.state.take().ok_or_else(|| {
224 ParquetError::General(String::from(
225 "Internal Error: RowGroupReader in invalid state",
226 ))
227 })
228 }
229
230 pub(crate) fn next_row_group(
232 &mut self,
233 row_group_idx: usize,
234 row_count: usize,
235 selection: Option<RowSelection>,
236 ) -> Result<(), ParquetError> {
237 let state = self.take_state()?;
238 if !matches!(state, RowGroupDecoderState::Finished) {
239 return Err(ParquetError::General(format!(
240 "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
241 )));
242 }
243 let plan_builder = ReadPlanBuilder::new(self.batch_size)
244 .with_selection(selection)
245 .with_row_selection_policy(self.row_selection_policy);
246
247 let row_group_info = RowGroupInfo {
248 row_group_idx,
249 row_count,
250 plan_builder,
251 };
252
253 self.state = Some(RowGroupDecoderState::Start { row_group_info });
254 Ok(())
255 }
256
257 pub(crate) fn try_build(
265 &mut self,
266 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
267 loop {
268 let current_state = self.take_state()?;
269 match self.try_transition(current_state)? {
271 NextState {
273 next_state,
274 result: Some(result),
275 } => {
276 self.state = Some(next_state);
278 return Ok(result);
279 }
280 NextState {
282 next_state,
283 result: None,
284 } => {
285 self.state = Some(next_state);
287 }
288 }
289 }
290 }
291
292 fn try_transition(
302 &mut self,
303 current_state: RowGroupDecoderState,
304 ) -> Result<NextState, ParquetError> {
305 let result = match current_state {
306 RowGroupDecoderState::Start { row_group_info } => {
307 let column_chunks = None; let Some(filter) = self.filter.take() else {
310 return Ok(NextState::again(RowGroupDecoderState::StartData {
312 row_group_info,
313 column_chunks,
314 cache_info: None,
315 }));
316 };
317 if filter.predicates.is_empty() {
319 return Ok(NextState::again(RowGroupDecoderState::StartData {
320 row_group_info,
321 column_chunks,
322 cache_info: None,
323 }));
324 };
325
326 let cache_projection =
328 self.compute_cache_projection(row_group_info.row_group_idx, &filter);
329
330 let cache_info = CacheInfo::new(
331 cache_projection,
332 Arc::new(Mutex::new(RowGroupCache::new(
333 self.batch_size,
334 self.max_predicate_cache_size,
335 ))),
336 );
337
338 let filter_info = FilterInfo::new(filter, cache_info);
339 NextState::again(RowGroupDecoderState::Filters {
340 row_group_info,
341 filter_info,
342 column_chunks,
343 })
344 }
345 RowGroupDecoderState::Filters {
347 row_group_info,
348 column_chunks,
349 filter_info,
350 } => {
351 let RowGroupInfo {
352 row_group_idx,
353 row_count,
354 plan_builder,
355 } = row_group_info;
356
357 if !plan_builder.selects_any() {
359 self.filter = Some(filter_info.into_filter());
361 return Ok(NextState::result(
362 RowGroupDecoderState::Finished,
363 DecodeResult::Finished,
364 ));
365 }
366
367 let predicate = filter_info.current();
369
370 let data_request = DataRequestBuilder::new(
373 row_group_idx,
374 row_count,
375 self.batch_size,
376 &self.metadata,
377 predicate.projection(), )
379 .with_selection(plan_builder.selection())
380 .with_cache_projection(Some(filter_info.cache_projection()))
382 .with_column_chunks(column_chunks)
383 .build();
384
385 let row_group_info = RowGroupInfo {
386 row_group_idx,
387 row_count,
388 plan_builder,
389 };
390
391 NextState::again(RowGroupDecoderState::WaitingOnFilterData {
392 row_group_info,
393 filter_info,
394 data_request,
395 })
396 }
397 RowGroupDecoderState::WaitingOnFilterData {
398 row_group_info,
399 data_request,
400 mut filter_info,
401 } => {
402 let needed_ranges = data_request.needed_ranges(&self.buffers);
404 if !needed_ranges.is_empty() {
405 return Ok(NextState::result(
407 RowGroupDecoderState::WaitingOnFilterData {
408 row_group_info,
409 filter_info,
410 data_request,
411 },
412 DecodeResult::NeedsData(needed_ranges),
413 ));
414 }
415
416 let RowGroupInfo {
418 row_group_idx,
419 row_count,
420 mut plan_builder,
421 } = row_group_info;
422
423 let predicate = filter_info.current();
424
425 let row_group = data_request.try_into_in_memory_row_group(
426 row_group_idx,
427 row_count,
428 &self.metadata,
429 predicate.projection(),
430 &mut self.buffers,
431 )?;
432
433 let cache_options = filter_info.cache_builder().producer();
434
435 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
436 .with_cache_options(Some(&cache_options))
437 .with_parquet_metadata(&self.metadata)
438 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
439
440 plan_builder =
441 plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
442
443 let row_group_info = RowGroupInfo {
444 row_group_idx,
445 row_count,
446 plan_builder,
447 };
448
449 let column_chunks = Some(row_group.column_chunks);
451
452 match filter_info.advance() {
454 AdvanceResult::Continue(filter_info) => {
455 NextState::again(RowGroupDecoderState::Filters {
456 row_group_info,
457 column_chunks,
458 filter_info,
459 })
460 }
461 AdvanceResult::Done(filter, cache_info) => {
463 assert!(self.filter.is_none());
465 self.filter = Some(filter);
466 NextState::again(RowGroupDecoderState::StartData {
467 row_group_info,
468 column_chunks,
469 cache_info: Some(cache_info),
470 })
471 }
472 }
473 }
474 RowGroupDecoderState::StartData {
475 row_group_info,
476 column_chunks,
477 cache_info,
478 } => {
479 let RowGroupInfo {
480 row_group_idx,
481 row_count,
482 plan_builder,
483 } = row_group_info;
484
485 let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
487
488 if rows_before == 0 {
489 return Ok(NextState::result(
491 RowGroupDecoderState::Finished,
492 DecodeResult::Finished,
493 ));
494 }
495
496 let mut plan_builder = plan_builder
498 .limited(row_count)
499 .with_offset(self.offset)
500 .with_limit(self.limit)
501 .build_limited();
502
503 let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
504
505 if let Some(offset) = &mut self.offset {
507 *offset = offset.saturating_sub(rows_before - rows_after)
510 }
511
512 if rows_after == 0 {
513 return Ok(NextState::result(
515 RowGroupDecoderState::Finished,
516 DecodeResult::Finished,
517 ));
518 }
519
520 if let Some(limit) = &mut self.limit {
521 *limit -= rows_after;
522 }
523
524 let data_request = DataRequestBuilder::new(
525 row_group_idx,
526 row_count,
527 self.batch_size,
528 &self.metadata,
529 &self.projection,
530 )
531 .with_selection(plan_builder.selection())
532 .with_column_chunks(column_chunks)
533 .build();
536
537 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
538
539 plan_builder = override_selector_strategy_if_needed(
540 plan_builder,
541 &self.projection,
542 self.row_group_offset_index(row_group_idx),
543 );
544
545 let row_group_info = RowGroupInfo {
546 row_group_idx,
547 row_count,
548 plan_builder,
549 };
550
551 NextState::again(RowGroupDecoderState::WaitingOnData {
552 row_group_info,
553 data_request,
554 cache_info,
555 })
556 }
557 RowGroupDecoderState::WaitingOnData {
559 row_group_info,
560 data_request,
561 cache_info,
562 } => {
563 let needed_ranges = data_request.needed_ranges(&self.buffers);
564 if !needed_ranges.is_empty() {
565 return Ok(NextState::result(
567 RowGroupDecoderState::WaitingOnData {
568 row_group_info,
569 data_request,
570 cache_info,
571 },
572 DecodeResult::NeedsData(needed_ranges),
573 ));
574 }
575
576 let RowGroupInfo {
578 row_group_idx,
579 row_count,
580 plan_builder,
581 } = row_group_info;
582
583 let row_group = data_request.try_into_in_memory_row_group(
584 row_group_idx,
585 row_count,
586 &self.metadata,
587 &self.projection,
588 &mut self.buffers,
589 )?;
590
591 let plan = plan_builder.build();
592
593 let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
595 .with_parquet_metadata(&self.metadata);
596 let array_reader = if let Some(cache_info) = cache_info.as_ref() {
597 let cache_options = cache_info.builder().consumer();
598 array_reader_builder
599 .with_cache_options(Some(&cache_options))
600 .build_array_reader(self.fields.as_deref(), &self.projection)
601 } else {
602 array_reader_builder
603 .build_array_reader(self.fields.as_deref(), &self.projection)
604 }?;
605
606 let reader = ParquetRecordBatchReader::new(array_reader, plan);
607 NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
608 }
609 RowGroupDecoderState::Finished => {
610 NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished)
612 }
613 };
614 Ok(result)
615 }
616
617 fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
622 let meta = self.metadata.row_group(row_group_idx);
623 match self.compute_cache_projection_inner(filter) {
624 Some(projection) => projection,
625 None => ProjectionMask::none(meta.columns().len()),
626 }
627 }
628
629 fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
630 if self.max_predicate_cache_size == 0 {
632 return None;
633 }
634 let mut cache_projection = filter.predicates.first()?.projection().clone();
635 for predicate in filter.predicates.iter() {
636 cache_projection.union(predicate.projection());
637 }
638 cache_projection.intersect(&self.projection);
639 self.exclude_nested_columns_from_cache(&cache_projection)
640 }
641
642 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
644 mask.without_nested_types(self.metadata.file_metadata().schema_descr())
645 }
646
647 fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
649 self.metadata
650 .offset_index()
651 .filter(|index| !index.is_empty())
652 .and_then(|index| index.get(row_group_idx))
653 .map(|columns| columns.as_slice())
654 }
655}
656
657fn override_selector_strategy_if_needed(
677 plan_builder: ReadPlanBuilder,
678 projection_mask: &ProjectionMask,
679 offset_index: Option<&[OffsetIndexMetaData]>,
680) -> ReadPlanBuilder {
681 let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
683 return plan_builder;
684 };
685
686 let preferred_strategy = plan_builder.resolve_selection_strategy();
687
688 let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
689 && plan_builder.selection().is_some_and(|selection| {
690 selection.should_force_selectors(projection_mask, offset_index)
691 });
692
693 let resolved_strategy = if force_selectors {
694 RowSelectionStrategy::Selectors
695 } else {
696 preferred_strategy
697 };
698
699 let new_policy = match resolved_strategy {
701 RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
702 RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
703 };
704
705 plan_builder.with_row_selection_policy(new_policy)
706}
707
708#[cfg(test)]
709mod tests {
710 use super::*;
711
712 #[test]
713 fn test_structure_size() {
715 assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
716 }
717}