1mod data;
19mod filter;
20
21use crate::DecodeResult;
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
24use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
25use crate::arrow::arrow_reader::{
26    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
27};
28use crate::arrow::in_memory_row_group::ColumnChunkData;
29use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
30use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
31use crate::arrow::schema::ParquetField;
32use crate::errors::ParquetError;
33use crate::file::metadata::ParquetMetaData;
34use crate::util::push_buffers::PushBuffers;
35use bytes::Bytes;
36use data::DataRequest;
37use filter::AdvanceResult;
38use filter::FilterInfo;
39use std::ops::Range;
40use std::sync::{Arc, Mutex};
41
42#[derive(Debug)]
44struct RowGroupInfo {
45    row_group_idx: usize,
46    row_count: usize,
47    plan_builder: ReadPlanBuilder,
48}
49
50#[derive(Debug)]
52enum RowGroupDecoderState {
53    Start {
54        row_group_info: RowGroupInfo,
55    },
56    Filters {
58        row_group_info: RowGroupInfo,
59        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
61        filter_info: FilterInfo,
62    },
63    WaitingOnFilterData {
65        row_group_info: RowGroupInfo,
66        filter_info: FilterInfo,
67        data_request: DataRequest,
68    },
69    StartData {
71        row_group_info: RowGroupInfo,
72        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
74        cache_info: Option<CacheInfo>,
76    },
77    WaitingOnData {
79        row_group_info: RowGroupInfo,
80        data_request: DataRequest,
81        cache_info: Option<CacheInfo>,
83    },
84    Finished,
86}
87
88#[derive(Debug)]
90struct NextState {
91    next_state: RowGroupDecoderState,
92    result: Option<DecodeResult<ParquetRecordBatchReader>>,
97}
98
99impl NextState {
100    fn again(next_state: RowGroupDecoderState) -> Self {
104        Self {
105            next_state,
106            result: None,
107        }
108    }
109
110    fn result(
112        next_state: RowGroupDecoderState,
113        result: DecodeResult<ParquetRecordBatchReader>,
114    ) -> Self {
115        Self {
116            next_state,
117            result: Some(result),
118        }
119    }
120}
121
122#[derive(Debug)]
128pub(crate) struct RowGroupReaderBuilder {
129    batch_size: usize,
131
132    projection: ProjectionMask,
134
135    metadata: Arc<ParquetMetaData>,
137
138    fields: Option<Arc<ParquetField>>,
140
141    filter: Option<RowFilter>,
143
144    limit: Option<usize>,
146
147    offset: Option<usize>,
149
150    max_predicate_cache_size: usize,
154
155    metrics: ArrowReaderMetrics,
157
158    state: Option<RowGroupDecoderState>,
163
164    buffers: PushBuffers,
166}
167
168impl RowGroupReaderBuilder {
169    #[expect(clippy::too_many_arguments)]
171    pub(crate) fn new(
172        batch_size: usize,
173        projection: ProjectionMask,
174        metadata: Arc<ParquetMetaData>,
175        fields: Option<Arc<ParquetField>>,
176        filter: Option<RowFilter>,
177        limit: Option<usize>,
178        offset: Option<usize>,
179        metrics: ArrowReaderMetrics,
180        max_predicate_cache_size: usize,
181        buffers: PushBuffers,
182    ) -> Self {
183        Self {
184            batch_size,
185            projection,
186            metadata,
187            fields,
188            filter,
189            limit,
190            offset,
191            metrics,
192            max_predicate_cache_size,
193            state: Some(RowGroupDecoderState::Finished),
194            buffers,
195        }
196    }
197
198    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
200        self.buffers.push_ranges(ranges, buffers);
201    }
202
203    pub fn buffered_bytes(&self) -> u64 {
205        self.buffers.buffered_bytes()
206    }
207
208    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
216        self.state.take().ok_or_else(|| {
217            ParquetError::General(String::from(
218                "Internal Error: RowGroupReader in invalid state",
219            ))
220        })
221    }
222
223    pub(crate) fn next_row_group(
225        &mut self,
226        row_group_idx: usize,
227        row_count: usize,
228        selection: Option<RowSelection>,
229    ) -> Result<(), ParquetError> {
230        let state = self.take_state()?;
231        if !matches!(state, RowGroupDecoderState::Finished) {
232            return Err(ParquetError::General(format!(
233                "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
234            )));
235        }
236        let plan_builder = ReadPlanBuilder::new(self.batch_size).with_selection(selection);
237
238        let row_group_info = RowGroupInfo {
239            row_group_idx,
240            row_count,
241            plan_builder,
242        };
243
244        self.state = Some(RowGroupDecoderState::Start { row_group_info });
245        Ok(())
246    }
247
248    pub(crate) fn try_build(
256        &mut self,
257    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
258        loop {
259            let current_state = self.take_state()?;
260            match self.try_transition(current_state)? {
262                NextState {
264                    next_state,
265                    result: Some(result),
266                } => {
267                    self.state = Some(next_state);
269                    return Ok(result);
270                }
271                NextState {
273                    next_state,
274                    result: None,
275                } => {
276                    self.state = Some(next_state);
278                }
279            }
280        }
281    }
282
283    fn try_transition(
293        &mut self,
294        current_state: RowGroupDecoderState,
295    ) -> Result<NextState, ParquetError> {
296        let result = match current_state {
297            RowGroupDecoderState::Start { row_group_info } => {
298                let column_chunks = None; let Some(filter) = self.filter.take() else {
301                    return Ok(NextState::again(RowGroupDecoderState::StartData {
303                        row_group_info,
304                        column_chunks,
305                        cache_info: None,
306                    }));
307                };
308                if filter.predicates.is_empty() {
310                    return Ok(NextState::again(RowGroupDecoderState::StartData {
311                        row_group_info,
312                        column_chunks,
313                        cache_info: None,
314                    }));
315                };
316
317                let cache_projection =
319                    self.compute_cache_projection(row_group_info.row_group_idx, &filter);
320
321                let cache_info = CacheInfo::new(
322                    cache_projection,
323                    Arc::new(Mutex::new(RowGroupCache::new(
324                        self.batch_size,
325                        self.max_predicate_cache_size,
326                    ))),
327                );
328
329                let filter_info = FilterInfo::new(filter, cache_info);
330                NextState::again(RowGroupDecoderState::Filters {
331                    row_group_info,
332                    filter_info,
333                    column_chunks,
334                })
335            }
336            RowGroupDecoderState::Filters {
338                row_group_info,
339                column_chunks,
340                filter_info,
341            } => {
342                let RowGroupInfo {
343                    row_group_idx,
344                    row_count,
345                    plan_builder,
346                } = row_group_info;
347
348                if !plan_builder.selects_any() {
350                    self.filter = Some(filter_info.into_filter());
352                    return Ok(NextState::result(
353                        RowGroupDecoderState::Finished,
354                        DecodeResult::Finished,
355                    ));
356                }
357
358                let predicate = filter_info.current();
360
361                let data_request = DataRequestBuilder::new(
364                    row_group_idx,
365                    row_count,
366                    self.batch_size,
367                    &self.metadata,
368                    predicate.projection(), )
370                .with_selection(plan_builder.selection())
371                .with_cache_projection(Some(filter_info.cache_projection()))
373                .with_column_chunks(column_chunks)
374                .build();
375
376                let row_group_info = RowGroupInfo {
377                    row_group_idx,
378                    row_count,
379                    plan_builder,
380                };
381
382                NextState::again(RowGroupDecoderState::WaitingOnFilterData {
383                    row_group_info,
384                    filter_info,
385                    data_request,
386                })
387            }
388            RowGroupDecoderState::WaitingOnFilterData {
389                row_group_info,
390                data_request,
391                mut filter_info,
392            } => {
393                let needed_ranges = data_request.needed_ranges(&self.buffers);
395                if !needed_ranges.is_empty() {
396                    return Ok(NextState::result(
398                        RowGroupDecoderState::WaitingOnFilterData {
399                            row_group_info,
400                            filter_info,
401                            data_request,
402                        },
403                        DecodeResult::NeedsData(needed_ranges),
404                    ));
405                }
406
407                let RowGroupInfo {
409                    row_group_idx,
410                    row_count,
411                    mut plan_builder,
412                } = row_group_info;
413
414                let predicate = filter_info.current();
415
416                let row_group = data_request.try_into_in_memory_row_group(
417                    row_group_idx,
418                    row_count,
419                    &self.metadata,
420                    predicate.projection(),
421                    &mut self.buffers,
422                )?;
423
424                let cache_options = filter_info.cache_builder().producer();
425
426                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
427                    .with_cache_options(Some(&cache_options))
428                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
429
430                plan_builder =
431                    plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
432
433                let row_group_info = RowGroupInfo {
434                    row_group_idx,
435                    row_count,
436                    plan_builder,
437                };
438
439                let column_chunks = Some(row_group.column_chunks);
441
442                match filter_info.advance() {
444                    AdvanceResult::Continue(filter_info) => {
445                        NextState::again(RowGroupDecoderState::Filters {
446                            row_group_info,
447                            column_chunks,
448                            filter_info,
449                        })
450                    }
451                    AdvanceResult::Done(filter, cache_info) => {
453                        assert!(self.filter.is_none());
455                        self.filter = Some(filter);
456                        NextState::again(RowGroupDecoderState::StartData {
457                            row_group_info,
458                            column_chunks,
459                            cache_info: Some(cache_info),
460                        })
461                    }
462                }
463            }
464            RowGroupDecoderState::StartData {
465                row_group_info,
466                column_chunks,
467                cache_info,
468            } => {
469                let RowGroupInfo {
470                    row_group_idx,
471                    row_count,
472                    plan_builder,
473                } = row_group_info;
474
475                let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
477
478                if rows_before == 0 {
479                    return Ok(NextState::result(
481                        RowGroupDecoderState::Finished,
482                        DecodeResult::Finished,
483                    ));
484                }
485
486                let plan_builder = plan_builder
488                    .limited(row_count)
489                    .with_offset(self.offset)
490                    .with_limit(self.limit)
491                    .build_limited();
492
493                let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
494
495                if let Some(offset) = &mut self.offset {
497                    *offset = offset.saturating_sub(rows_before - rows_after)
500                }
501
502                if rows_after == 0 {
503                    return Ok(NextState::result(
505                        RowGroupDecoderState::Finished,
506                        DecodeResult::Finished,
507                    ));
508                }
509
510                if let Some(limit) = &mut self.limit {
511                    *limit -= rows_after;
512                }
513
514                let data_request = DataRequestBuilder::new(
515                    row_group_idx,
516                    row_count,
517                    self.batch_size,
518                    &self.metadata,
519                    &self.projection,
520                )
521                .with_selection(plan_builder.selection())
522                .with_column_chunks(column_chunks)
523                .build();
526
527                let row_group_info = RowGroupInfo {
528                    row_group_idx,
529                    row_count,
530                    plan_builder,
531                };
532
533                NextState::again(RowGroupDecoderState::WaitingOnData {
534                    row_group_info,
535                    data_request,
536                    cache_info,
537                })
538            }
539            RowGroupDecoderState::WaitingOnData {
541                row_group_info,
542                data_request,
543                cache_info,
544            } => {
545                let needed_ranges = data_request.needed_ranges(&self.buffers);
546                if !needed_ranges.is_empty() {
547                    return Ok(NextState::result(
549                        RowGroupDecoderState::WaitingOnData {
550                            row_group_info,
551                            data_request,
552                            cache_info,
553                        },
554                        DecodeResult::NeedsData(needed_ranges),
555                    ));
556                }
557
558                let RowGroupInfo {
560                    row_group_idx,
561                    row_count,
562                    plan_builder,
563                } = row_group_info;
564
565                let row_group = data_request.try_into_in_memory_row_group(
566                    row_group_idx,
567                    row_count,
568                    &self.metadata,
569                    &self.projection,
570                    &mut self.buffers,
571                )?;
572
573                let plan = plan_builder.build();
574
575                let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics);
577                let array_reader = if let Some(cache_info) = cache_info.as_ref() {
578                    let cache_options = cache_info.builder().consumer();
579                    array_reader_builder
580                        .with_cache_options(Some(&cache_options))
581                        .build_array_reader(self.fields.as_deref(), &self.projection)
582                } else {
583                    array_reader_builder
584                        .build_array_reader(self.fields.as_deref(), &self.projection)
585                }?;
586
587                let reader = ParquetRecordBatchReader::new(array_reader, plan);
588                NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
589            }
590            RowGroupDecoderState::Finished => {
591                NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished)
593            }
594        };
595        Ok(result)
596    }
597
598    fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
603        let meta = self.metadata.row_group(row_group_idx);
604        match self.compute_cache_projection_inner(filter) {
605            Some(projection) => projection,
606            None => ProjectionMask::none(meta.columns().len()),
607        }
608    }
609
610    fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
611        let mut cache_projection = filter.predicates.first()?.projection().clone();
612        for predicate in filter.predicates.iter() {
613            cache_projection.union(predicate.projection());
614        }
615        cache_projection.intersect(&self.projection);
616        self.exclude_nested_columns_from_cache(&cache_projection)
617    }
618
619    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
621        let schema = self.metadata.file_metadata().schema_descr();
622        let num_leaves = schema.num_columns();
623
624        let num_roots = schema.root_schema().get_fields().len();
626        let mut root_leaf_counts = vec![0usize; num_roots];
627        for leaf_idx in 0..num_leaves {
628            let root_idx = schema.get_column_root_idx(leaf_idx);
629            root_leaf_counts[root_idx] += 1;
630        }
631
632        let mut included_leaves = Vec::new();
634        for leaf_idx in 0..num_leaves {
635            if mask.leaf_included(leaf_idx) {
636                let root_idx = schema.get_column_root_idx(leaf_idx);
637                if root_leaf_counts[root_idx] == 1 {
638                    included_leaves.push(leaf_idx);
639                }
640            }
641        }
642
643        if included_leaves.is_empty() {
644            None
645        } else {
646            Some(ProjectionMask::leaves(schema, included_leaves))
647        }
648    }
649}
650
651#[cfg(test)]
652mod tests {
653    use super::*;
654    #[test]
655    fn test_structure_size() {
657        assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 184);
658    }
659}