1mod data;
19mod filter;
20
21use crate::DecodeResult;
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
24use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
25use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
26use crate::arrow::arrow_reader::{
27 ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, Mutex};
43
44#[derive(Debug)]
46struct RowGroupInfo {
47 row_group_idx: usize,
48 row_count: usize,
49 plan_builder: ReadPlanBuilder,
50}
51
52#[derive(Debug)]
54enum RowGroupDecoderState {
55 Start {
56 row_group_info: RowGroupInfo,
57 },
58 Filters {
60 row_group_info: RowGroupInfo,
61 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
63 filter_info: FilterInfo,
64 },
65 WaitingOnFilterData {
67 row_group_info: RowGroupInfo,
68 filter_info: FilterInfo,
69 data_request: DataRequest,
70 },
71 StartData {
73 row_group_info: RowGroupInfo,
74 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
76 cache_info: Option<CacheInfo>,
78 },
79 WaitingOnData {
81 row_group_info: RowGroupInfo,
82 data_request: DataRequest,
83 cache_info: Option<CacheInfo>,
85 },
86 Finished,
88}
89
90#[derive(Debug)]
92struct NextState {
93 next_state: RowGroupDecoderState,
94 result: Option<DecodeResult<ParquetRecordBatchReader>>,
99}
100
101impl NextState {
102 fn again(next_state: RowGroupDecoderState) -> Self {
106 Self {
107 next_state,
108 result: None,
109 }
110 }
111
112 fn result(
114 next_state: RowGroupDecoderState,
115 result: DecodeResult<ParquetRecordBatchReader>,
116 ) -> Self {
117 Self {
118 next_state,
119 result: Some(result),
120 }
121 }
122}
123
124#[derive(Debug)]
130pub(crate) struct RowGroupReaderBuilder {
131 batch_size: usize,
133
134 projection: ProjectionMask,
136
137 metadata: Arc<ParquetMetaData>,
139
140 fields: Option<Arc<ParquetField>>,
142
143 filter: Option<RowFilter>,
145
146 limit: Option<usize>,
148
149 offset: Option<usize>,
151
152 max_predicate_cache_size: usize,
156
157 metrics: ArrowReaderMetrics,
159
160 row_selection_policy: RowSelectionPolicy,
162
163 state: Option<RowGroupDecoderState>,
168
169 buffers: PushBuffers,
171}
172
173impl RowGroupReaderBuilder {
174 #[expect(clippy::too_many_arguments)]
176 pub(crate) fn new(
177 batch_size: usize,
178 projection: ProjectionMask,
179 metadata: Arc<ParquetMetaData>,
180 fields: Option<Arc<ParquetField>>,
181 filter: Option<RowFilter>,
182 limit: Option<usize>,
183 offset: Option<usize>,
184 metrics: ArrowReaderMetrics,
185 max_predicate_cache_size: usize,
186 buffers: PushBuffers,
187 row_selection_policy: RowSelectionPolicy,
188 ) -> Self {
189 Self {
190 batch_size,
191 projection,
192 metadata,
193 fields,
194 filter,
195 limit,
196 offset,
197 metrics,
198 max_predicate_cache_size,
199 row_selection_policy,
200 state: Some(RowGroupDecoderState::Finished),
201 buffers,
202 }
203 }
204
205 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
207 self.buffers.push_ranges(ranges, buffers);
208 }
209
210 pub fn buffered_bytes(&self) -> u64 {
212 self.buffers.buffered_bytes()
213 }
214
215 fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
223 self.state.take().ok_or_else(|| {
224 ParquetError::General(String::from(
225 "Internal Error: RowGroupReader in invalid state",
226 ))
227 })
228 }
229
230 pub(crate) fn next_row_group(
232 &mut self,
233 row_group_idx: usize,
234 row_count: usize,
235 selection: Option<RowSelection>,
236 ) -> Result<(), ParquetError> {
237 let state = self.take_state()?;
238 if !matches!(state, RowGroupDecoderState::Finished) {
239 return Err(ParquetError::General(format!(
240 "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
241 )));
242 }
243 let plan_builder = ReadPlanBuilder::new(self.batch_size)
244 .with_selection(selection)
245 .with_row_selection_policy(self.row_selection_policy);
246
247 let row_group_info = RowGroupInfo {
248 row_group_idx,
249 row_count,
250 plan_builder,
251 };
252
253 self.state = Some(RowGroupDecoderState::Start { row_group_info });
254 Ok(())
255 }
256
257 pub(crate) fn try_build(
265 &mut self,
266 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
267 loop {
268 let current_state = self.take_state()?;
269 match self.try_transition(current_state)? {
271 NextState {
273 next_state,
274 result: Some(result),
275 } => {
276 self.state = Some(next_state);
278 return Ok(result);
279 }
280 NextState {
282 next_state,
283 result: None,
284 } => {
285 self.state = Some(next_state);
287 }
288 }
289 }
290 }
291
292 fn try_transition(
302 &mut self,
303 current_state: RowGroupDecoderState,
304 ) -> Result<NextState, ParquetError> {
305 let result = match current_state {
306 RowGroupDecoderState::Start { row_group_info } => {
307 let column_chunks = None; let Some(filter) = self.filter.take() else {
310 return Ok(NextState::again(RowGroupDecoderState::StartData {
312 row_group_info,
313 column_chunks,
314 cache_info: None,
315 }));
316 };
317 if filter.predicates.is_empty() {
319 return Ok(NextState::again(RowGroupDecoderState::StartData {
320 row_group_info,
321 column_chunks,
322 cache_info: None,
323 }));
324 };
325
326 let cache_projection =
328 self.compute_cache_projection(row_group_info.row_group_idx, &filter);
329
330 let cache_info = CacheInfo::new(
331 cache_projection,
332 Arc::new(Mutex::new(RowGroupCache::new(
333 self.batch_size,
334 self.max_predicate_cache_size,
335 ))),
336 );
337
338 let filter_info = FilterInfo::new(filter, cache_info);
339 NextState::again(RowGroupDecoderState::Filters {
340 row_group_info,
341 filter_info,
342 column_chunks,
343 })
344 }
345 RowGroupDecoderState::Filters {
347 row_group_info,
348 column_chunks,
349 filter_info,
350 } => {
351 let RowGroupInfo {
352 row_group_idx,
353 row_count,
354 plan_builder,
355 } = row_group_info;
356
357 if !plan_builder.selects_any() {
359 self.filter = Some(filter_info.into_filter());
361 return Ok(NextState::result(
362 RowGroupDecoderState::Finished,
363 DecodeResult::Finished,
364 ));
365 }
366
367 let predicate = filter_info.current();
369
370 let data_request = DataRequestBuilder::new(
373 row_group_idx,
374 row_count,
375 self.batch_size,
376 &self.metadata,
377 predicate.projection(), )
379 .with_selection(plan_builder.selection())
380 .with_cache_projection(Some(filter_info.cache_projection()))
382 .with_column_chunks(column_chunks)
383 .build();
384
385 let row_group_info = RowGroupInfo {
386 row_group_idx,
387 row_count,
388 plan_builder,
389 };
390
391 NextState::again(RowGroupDecoderState::WaitingOnFilterData {
392 row_group_info,
393 filter_info,
394 data_request,
395 })
396 }
397 RowGroupDecoderState::WaitingOnFilterData {
398 row_group_info,
399 data_request,
400 mut filter_info,
401 } => {
402 let needed_ranges = data_request.needed_ranges(&self.buffers);
404 if !needed_ranges.is_empty() {
405 return Ok(NextState::result(
407 RowGroupDecoderState::WaitingOnFilterData {
408 row_group_info,
409 filter_info,
410 data_request,
411 },
412 DecodeResult::NeedsData(needed_ranges),
413 ));
414 }
415
416 let RowGroupInfo {
418 row_group_idx,
419 row_count,
420 mut plan_builder,
421 } = row_group_info;
422
423 let predicate = filter_info.current();
424
425 let row_group = data_request.try_into_in_memory_row_group(
426 row_group_idx,
427 row_count,
428 &self.metadata,
429 predicate.projection(),
430 &mut self.buffers,
431 )?;
432
433 let cache_options = filter_info.cache_builder().producer();
434
435 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
436 .with_cache_options(Some(&cache_options))
437 .with_parquet_metadata(&self.metadata)
438 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
439
440 plan_builder = override_selector_strategy_if_needed(
444 plan_builder,
445 predicate.projection(),
446 self.row_group_offset_index(row_group_idx),
447 );
448 plan_builder =
451 plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
452
453 let row_group_info = RowGroupInfo {
454 row_group_idx,
455 row_count,
456 plan_builder,
457 };
458
459 let column_chunks = Some(row_group.column_chunks);
461
462 match filter_info.advance() {
464 AdvanceResult::Continue(filter_info) => {
465 NextState::again(RowGroupDecoderState::Filters {
466 row_group_info,
467 column_chunks,
468 filter_info,
469 })
470 }
471 AdvanceResult::Done(filter, cache_info) => {
473 assert!(self.filter.is_none());
475 self.filter = Some(filter);
476 NextState::again(RowGroupDecoderState::StartData {
477 row_group_info,
478 column_chunks,
479 cache_info: Some(cache_info),
480 })
481 }
482 }
483 }
484 RowGroupDecoderState::StartData {
485 row_group_info,
486 column_chunks,
487 cache_info,
488 } => {
489 let RowGroupInfo {
490 row_group_idx,
491 row_count,
492 plan_builder,
493 } = row_group_info;
494
495 let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
497
498 if rows_before == 0 {
499 return Ok(NextState::result(
501 RowGroupDecoderState::Finished,
502 DecodeResult::Finished,
503 ));
504 }
505
506 let mut plan_builder = plan_builder
508 .limited(row_count)
509 .with_offset(self.offset)
510 .with_limit(self.limit)
511 .build_limited();
512
513 let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
514
515 if let Some(offset) = &mut self.offset {
517 *offset = offset.saturating_sub(rows_before - rows_after)
520 }
521
522 if rows_after == 0 {
523 return Ok(NextState::result(
525 RowGroupDecoderState::Finished,
526 DecodeResult::Finished,
527 ));
528 }
529
530 if let Some(limit) = &mut self.limit {
531 *limit -= rows_after;
532 }
533
534 let data_request = DataRequestBuilder::new(
535 row_group_idx,
536 row_count,
537 self.batch_size,
538 &self.metadata,
539 &self.projection,
540 )
541 .with_selection(plan_builder.selection())
542 .with_column_chunks(column_chunks)
543 .build();
546
547 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
548
549 plan_builder = override_selector_strategy_if_needed(
550 plan_builder,
551 &self.projection,
552 self.row_group_offset_index(row_group_idx),
553 );
554
555 let row_group_info = RowGroupInfo {
556 row_group_idx,
557 row_count,
558 plan_builder,
559 };
560
561 NextState::again(RowGroupDecoderState::WaitingOnData {
562 row_group_info,
563 data_request,
564 cache_info,
565 })
566 }
567 RowGroupDecoderState::WaitingOnData {
569 row_group_info,
570 data_request,
571 cache_info,
572 } => {
573 let needed_ranges = data_request.needed_ranges(&self.buffers);
574 if !needed_ranges.is_empty() {
575 return Ok(NextState::result(
577 RowGroupDecoderState::WaitingOnData {
578 row_group_info,
579 data_request,
580 cache_info,
581 },
582 DecodeResult::NeedsData(needed_ranges),
583 ));
584 }
585
586 let RowGroupInfo {
588 row_group_idx,
589 row_count,
590 plan_builder,
591 } = row_group_info;
592
593 let row_group = data_request.try_into_in_memory_row_group(
594 row_group_idx,
595 row_count,
596 &self.metadata,
597 &self.projection,
598 &mut self.buffers,
599 )?;
600
601 let plan = plan_builder.build();
602
603 let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
605 .with_parquet_metadata(&self.metadata);
606 let array_reader = if let Some(cache_info) = cache_info.as_ref() {
607 let cache_options = cache_info.builder().consumer();
608 array_reader_builder
609 .with_cache_options(Some(&cache_options))
610 .build_array_reader(self.fields.as_deref(), &self.projection)
611 } else {
612 array_reader_builder
613 .build_array_reader(self.fields.as_deref(), &self.projection)
614 }?;
615
616 let reader = ParquetRecordBatchReader::new(array_reader, plan);
617 NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
618 }
619 RowGroupDecoderState::Finished => {
620 NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished)
622 }
623 };
624 Ok(result)
625 }
626
627 fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
632 let meta = self.metadata.row_group(row_group_idx);
633 match self.compute_cache_projection_inner(filter) {
634 Some(projection) => projection,
635 None => ProjectionMask::none(meta.columns().len()),
636 }
637 }
638
639 fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
640 if self.max_predicate_cache_size == 0 {
642 return None;
643 }
644 let mut cache_projection = filter.predicates.first()?.projection().clone();
645 for predicate in filter.predicates.iter() {
646 cache_projection.union(predicate.projection());
647 }
648 cache_projection.intersect(&self.projection);
649 self.exclude_nested_columns_from_cache(&cache_projection)
650 }
651
652 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
654 mask.without_nested_types(self.metadata.file_metadata().schema_descr())
655 }
656
657 fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
659 self.metadata
660 .offset_index()
661 .filter(|index| !index.is_empty())
662 .and_then(|index| index.get(row_group_idx))
663 .map(|columns| columns.as_slice())
664 }
665}
666
667fn override_selector_strategy_if_needed(
687 plan_builder: ReadPlanBuilder,
688 projection_mask: &ProjectionMask,
689 offset_index: Option<&[OffsetIndexMetaData]>,
690) -> ReadPlanBuilder {
691 let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
693 return plan_builder;
694 };
695
696 let preferred_strategy = plan_builder.resolve_selection_strategy();
697
698 let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
699 && plan_builder.selection().is_some_and(|selection| {
700 selection.should_force_selectors(projection_mask, offset_index)
701 });
702
703 let resolved_strategy = if force_selectors {
704 RowSelectionStrategy::Selectors
705 } else {
706 preferred_strategy
707 };
708
709 let new_policy = match resolved_strategy {
711 RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
712 RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
713 };
714
715 plan_builder.with_row_selection_policy(new_policy)
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
721
722 #[test]
723 fn test_structure_size() {
725 assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
726 }
727}