Skip to main content

parquet/arrow/push_decoder/reader_builder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod data;
19mod filter;
20
21use crate::DecodeResult;
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache};
24use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
25use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
26use crate::arrow::arrow_reader::{
27    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, RwLock};
43
44/// The current row group being read and the read plan
45#[derive(Debug)]
46struct RowGroupInfo {
47    row_group_idx: usize,
48    row_count: usize,
49    plan_builder: ReadPlanBuilder,
50}
51
52/// This is the inner state machine for reading a single row group.
53#[derive(Debug)]
54enum RowGroupDecoderState {
55    Start {
56        row_group_info: RowGroupInfo,
57    },
58    /// Planning filters, but haven't yet requested data to evaluate them
59    Filters {
60        row_group_info: RowGroupInfo,
61        /// Any previously read column chunk data from prior filters
62        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
63        filter_info: FilterInfo,
64    },
65    /// Needs data to evaluate current filter
66    WaitingOnFilterData {
67        row_group_info: RowGroupInfo,
68        filter_info: FilterInfo,
69        data_request: DataRequest,
70    },
71    /// Know what data to actually read, after all predicates
72    StartData {
73        row_group_info: RowGroupInfo,
74        /// Any previously read column chunk data from the filtering phase
75        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
76        /// Any cached filter results
77        cache_info: Option<CacheInfo>,
78    },
79    /// Needs data to proceed with reading the output
80    WaitingOnData {
81        row_group_info: RowGroupInfo,
82        data_request: DataRequest,
83        /// Any cached filter results
84        cache_info: Option<CacheInfo>,
85    },
86    /// Finished (or not yet started) reading this group
87    Finished,
88}
89
90/// Result of a state transition
91#[derive(Debug)]
92struct NextState {
93    next_state: RowGroupDecoderState,
94    /// result to return, if any
95    ///
96    /// * `Some`: the processing should stop and return the result
97    /// * `None`: processing should continue
98    result: Option<DecodeResult<ParquetRecordBatchReader>>,
99}
100
101impl NextState {
102    /// The next state with no result.
103    ///
104    /// This indicates processing should continue
105    fn again(next_state: RowGroupDecoderState) -> Self {
106        Self {
107            next_state,
108            result: None,
109        }
110    }
111
112    /// Create a NextState with a result that should be returned
113    fn result(
114        next_state: RowGroupDecoderState,
115        result: DecodeResult<ParquetRecordBatchReader>,
116    ) -> Self {
117        Self {
118            next_state,
119            result: Some(result),
120        }
121    }
122}
123
124/// Builder for [`ParquetRecordBatchReader`] for a single row group
125///
126/// This struct drives the main state machine for decoding each row group -- it
127/// determines what data is needed, and then assembles the
128/// `ParquetRecordBatchReader` when all data is available.
129#[derive(Debug)]
130pub(crate) struct RowGroupReaderBuilder {
131    /// The output batch size
132    batch_size: usize,
133
134    /// What columns to project (produce in each output batch)
135    projection: ProjectionMask,
136
137    /// The Parquet file metadata
138    metadata: Arc<ParquetMetaData>,
139
140    /// Top level parquet schema and arrow schema mapping
141    fields: Option<Arc<ParquetField>>,
142
143    /// Optional filter
144    filter: Option<RowFilter>,
145
146    /// Limit to apply to remaining row groups (decremented as rows are read)
147    limit: Option<usize>,
148
149    /// Offset to apply to remaining row groups (decremented as rows are read)
150    offset: Option<usize>,
151
152    /// The size in bytes of the predicate cache to use
153    ///
154    /// See [`RowGroupCache`] for details.
155    max_predicate_cache_size: usize,
156
157    /// The metrics collector
158    metrics: ArrowReaderMetrics,
159
160    /// Strategy for materialising row selections
161    row_selection_policy: RowSelectionPolicy,
162
163    /// Current state of the decoder.
164    ///
165    /// It is taken when processing, and must be put back before returning
166    /// it is a bug error if it is not put back after transitioning states.
167    state: Option<RowGroupDecoderState>,
168
169    /// The underlying data store
170    buffers: PushBuffers,
171}
172
173impl RowGroupReaderBuilder {
174    /// Create a new RowGroupReaderBuilder
175    #[expect(clippy::too_many_arguments)]
176    pub(crate) fn new(
177        batch_size: usize,
178        projection: ProjectionMask,
179        metadata: Arc<ParquetMetaData>,
180        fields: Option<Arc<ParquetField>>,
181        filter: Option<RowFilter>,
182        limit: Option<usize>,
183        offset: Option<usize>,
184        metrics: ArrowReaderMetrics,
185        max_predicate_cache_size: usize,
186        buffers: PushBuffers,
187        row_selection_policy: RowSelectionPolicy,
188    ) -> Self {
189        Self {
190            batch_size,
191            projection,
192            metadata,
193            fields,
194            filter,
195            limit,
196            offset,
197            metrics,
198            max_predicate_cache_size,
199            row_selection_policy,
200            state: Some(RowGroupDecoderState::Finished),
201            buffers,
202        }
203    }
204
205    /// Push new data buffers that can be used to satisfy pending requests
206    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
207        self.buffers.push_ranges(ranges, buffers);
208    }
209
210    /// Returns the total number of buffered bytes available
211    pub fn buffered_bytes(&self) -> u64 {
212        self.buffers.buffered_bytes()
213    }
214
215    /// take the current state, leaving None in its place.
216    ///
217    /// Returns an error if there the state wasn't put back after the previous
218    /// call to [`Self::take_state`].
219    ///
220    /// Any code that calls this method must ensure that the state is put back
221    /// before returning, otherwise the reader will error next time it is called
222    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
223        self.state.take().ok_or_else(|| {
224            ParquetError::General(String::from(
225                "Internal Error: RowGroupReader in invalid state",
226            ))
227        })
228    }
229
230    /// Setup this reader to read the next row group
231    pub(crate) fn next_row_group(
232        &mut self,
233        row_group_idx: usize,
234        row_count: usize,
235        selection: Option<RowSelection>,
236    ) -> Result<(), ParquetError> {
237        let state = self.take_state()?;
238        if !matches!(state, RowGroupDecoderState::Finished) {
239            return Err(ParquetError::General(format!(
240                "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
241            )));
242        }
243        let plan_builder = ReadPlanBuilder::new(self.batch_size)
244            .with_selection(selection)
245            .with_row_selection_policy(self.row_selection_policy);
246
247        let row_group_info = RowGroupInfo {
248            row_group_idx,
249            row_count,
250            plan_builder,
251        };
252
253        self.state = Some(RowGroupDecoderState::Start { row_group_info });
254        Ok(())
255    }
256
257    /// Try to build the next `ParquetRecordBatchReader` from this RowGroupReader.
258    ///
259    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
260    /// ranges of data that are needed to proceed.
261    ///
262    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
263    /// `DecodeResult::Data`.
264    pub(crate) fn try_build(
265        &mut self,
266    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
267        loop {
268            let current_state = self.take_state()?;
269            // Try to transition the decoder.
270            match self.try_transition(current_state)? {
271                // Either produced a batch reader, needed input, or finished
272                NextState {
273                    next_state,
274                    result: Some(result),
275                } => {
276                    // put back the next state
277                    self.state = Some(next_state);
278                    return Ok(result);
279                }
280                // completed one internal state, maybe can proceed further
281                NextState {
282                    next_state,
283                    result: None,
284                } => {
285                    // continue processing
286                    self.state = Some(next_state);
287                }
288            }
289        }
290    }
291
292    /// Current state --> next state + optional output
293    ///
294    /// This is the main state transition function for the row group reader
295    /// and encodes the row group decoding state machine.
296    ///
297    /// # Notes
298    ///
299    /// This structure is used to reduce the indentation level of the main loop
300    /// in try_build
301    fn try_transition(
302        &mut self,
303        current_state: RowGroupDecoderState,
304    ) -> Result<NextState, ParquetError> {
305        let result = match current_state {
306            RowGroupDecoderState::Start { row_group_info } => {
307                let column_chunks = None; // no prior column chunks
308
309                let Some(filter) = self.filter.take() else {
310                    // no filter, start trying to read data immediately
311                    return Ok(NextState::again(RowGroupDecoderState::StartData {
312                        row_group_info,
313                        column_chunks,
314                        cache_info: None,
315                    }));
316                };
317                // no predicates in filter, so start reading immediately
318                if filter.predicates.is_empty() {
319                    return Ok(NextState::again(RowGroupDecoderState::StartData {
320                        row_group_info,
321                        column_chunks,
322                        cache_info: None,
323                    }));
324                };
325
326                // we have predicates to evaluate
327                let cache_projection =
328                    self.compute_cache_projection(row_group_info.row_group_idx, &filter);
329
330                let cache_info = CacheInfo::new(
331                    cache_projection,
332                    Arc::new(RwLock::new(RowGroupCache::new(
333                        self.batch_size,
334                        self.max_predicate_cache_size,
335                    ))),
336                );
337
338                let filter_info = FilterInfo::new(filter, cache_info);
339                NextState::again(RowGroupDecoderState::Filters {
340                    row_group_info,
341                    filter_info,
342                    column_chunks,
343                })
344            }
345            // need to evaluate filters
346            RowGroupDecoderState::Filters {
347                row_group_info,
348                column_chunks,
349                filter_info,
350            } => {
351                let RowGroupInfo {
352                    row_group_idx,
353                    row_count,
354                    plan_builder,
355                } = row_group_info;
356
357                // If nothing is selected, we are done with this row group
358                if !plan_builder.selects_any() {
359                    // ruled out entire row group
360                    self.filter = Some(filter_info.into_filter());
361                    return Ok(NextState::result(
362                        RowGroupDecoderState::Finished,
363                        DecodeResult::Finished,
364                    ));
365                }
366
367                // Make a request for the data needed to evaluate the current predicate
368                let predicate = filter_info.current();
369
370                // need to fetch pages the column needs for decoding, figure
371                // that out based on the current selection and projection
372                let data_request = DataRequestBuilder::new(
373                    row_group_idx,
374                    row_count,
375                    self.batch_size,
376                    &self.metadata,
377                    predicate.projection(), // use the predicate's projection
378                )
379                .with_selection(plan_builder.selection())
380                // Fetch predicate columns; expand selection only for cached predicate columns
381                .with_cache_projection(Some(filter_info.cache_projection()))
382                .with_column_chunks(column_chunks)
383                .build();
384
385                let row_group_info = RowGroupInfo {
386                    row_group_idx,
387                    row_count,
388                    plan_builder,
389                };
390
391                NextState::again(RowGroupDecoderState::WaitingOnFilterData {
392                    row_group_info,
393                    filter_info,
394                    data_request,
395                })
396            }
397            RowGroupDecoderState::WaitingOnFilterData {
398                row_group_info,
399                data_request,
400                mut filter_info,
401            } => {
402                // figure out what ranges we still need
403                let needed_ranges = data_request.needed_ranges(&self.buffers);
404                if !needed_ranges.is_empty() {
405                    // still need data
406                    return Ok(NextState::result(
407                        RowGroupDecoderState::WaitingOnFilterData {
408                            row_group_info,
409                            filter_info,
410                            data_request,
411                        },
412                        DecodeResult::NeedsData(needed_ranges),
413                    ));
414                }
415
416                // otherwise we have all the data we need to evaluate the predicate
417                let RowGroupInfo {
418                    row_group_idx,
419                    row_count,
420                    mut plan_builder,
421                } = row_group_info;
422
423                let predicate = filter_info.current();
424
425                let row_group = data_request.try_into_in_memory_row_group(
426                    row_group_idx,
427                    row_count,
428                    &self.metadata,
429                    predicate.projection(),
430                    &mut self.buffers,
431                )?;
432
433                let cache_options = filter_info.cache_builder().producer();
434
435                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
436                    .with_cache_options(Some(&cache_options))
437                    .with_parquet_metadata(&self.metadata)
438                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
439
440                // Reset to original policy before each predicate so the override
441                // can detect page skipping for THIS predicate's columns.
442                // Without this reset, a prior predicate's override (e.g. Mask)
443                // carries forward and the check returns early, missing unfetched
444                // pages for subsequent predicates.
445                plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
446
447                // Prepare to evaluate the filter.
448                // Note: first update the selection strategy to properly handle any pages
449                // pruned during fetch
450                plan_builder = override_selector_strategy_if_needed(
451                    plan_builder,
452                    predicate.projection(),
453                    self.row_group_offset_index(row_group_idx),
454                );
455                // `with_predicate` actually evaluates the filter
456
457                plan_builder =
458                    plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
459
460                let row_group_info = RowGroupInfo {
461                    row_group_idx,
462                    row_count,
463                    plan_builder,
464                };
465
466                // Take back the column chunks that were read
467                let column_chunks = Some(row_group.column_chunks);
468
469                // advance to the next predicate, if any
470                match filter_info.advance() {
471                    AdvanceResult::Continue(filter_info) => {
472                        NextState::again(RowGroupDecoderState::Filters {
473                            row_group_info,
474                            column_chunks,
475                            filter_info,
476                        })
477                    }
478                    // done with predicates, proceed to reading data
479                    AdvanceResult::Done(filter, cache_info) => {
480                        // remember we need to put back the filter
481                        assert!(self.filter.is_none());
482                        self.filter = Some(filter);
483                        NextState::again(RowGroupDecoderState::StartData {
484                            row_group_info,
485                            column_chunks,
486                            cache_info: Some(cache_info),
487                        })
488                    }
489                }
490            }
491            RowGroupDecoderState::StartData {
492                row_group_info,
493                column_chunks,
494                cache_info,
495            } => {
496                let RowGroupInfo {
497                    row_group_idx,
498                    row_count,
499                    plan_builder,
500                } = row_group_info;
501
502                // Compute the number of rows in the selection before applying limit and offset
503                let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
504
505                if rows_before == 0 {
506                    // ruled out entire row group
507                    return Ok(NextState::result(
508                        RowGroupDecoderState::Finished,
509                        DecodeResult::Finished,
510                    ));
511                }
512
513                // Apply any limit and offset
514                let mut plan_builder = plan_builder
515                    .limited(row_count)
516                    .with_offset(self.offset)
517                    .with_limit(self.limit)
518                    .build_limited();
519
520                let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
521
522                // Update running offset and limit for after the current row group is read
523                if let Some(offset) = &mut self.offset {
524                    // Reduction is either because of offset or limit, as limit is applied
525                    // after offset has been "exhausted" can just use saturating sub here
526                    *offset = offset.saturating_sub(rows_before - rows_after)
527                }
528
529                if rows_after == 0 {
530                    // no rows left after applying limit/offset
531                    return Ok(NextState::result(
532                        RowGroupDecoderState::Finished,
533                        DecodeResult::Finished,
534                    ));
535                }
536
537                if let Some(limit) = &mut self.limit {
538                    *limit -= rows_after;
539                }
540
541                let data_request = DataRequestBuilder::new(
542                    row_group_idx,
543                    row_count,
544                    self.batch_size,
545                    &self.metadata,
546                    &self.projection,
547                )
548                .with_selection(plan_builder.selection())
549                .with_column_chunks(column_chunks)
550                // Final projection fetch shouldn't expand selection for cache
551                // so don't call with_cache_projection here
552                .build();
553
554                plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
555
556                plan_builder = override_selector_strategy_if_needed(
557                    plan_builder,
558                    &self.projection,
559                    self.row_group_offset_index(row_group_idx),
560                );
561
562                let row_group_info = RowGroupInfo {
563                    row_group_idx,
564                    row_count,
565                    plan_builder,
566                };
567
568                NextState::again(RowGroupDecoderState::WaitingOnData {
569                    row_group_info,
570                    data_request,
571                    cache_info,
572                })
573            }
574            // Waiting on data to proceed with reading the output
575            RowGroupDecoderState::WaitingOnData {
576                row_group_info,
577                data_request,
578                cache_info,
579            } => {
580                let needed_ranges = data_request.needed_ranges(&self.buffers);
581                if !needed_ranges.is_empty() {
582                    // still need data
583                    return Ok(NextState::result(
584                        RowGroupDecoderState::WaitingOnData {
585                            row_group_info,
586                            data_request,
587                            cache_info,
588                        },
589                        DecodeResult::NeedsData(needed_ranges),
590                    ));
591                }
592
593                // otherwise we have all the data we need to proceed
594                let RowGroupInfo {
595                    row_group_idx,
596                    row_count,
597                    plan_builder,
598                } = row_group_info;
599
600                let row_group = data_request.try_into_in_memory_row_group(
601                    row_group_idx,
602                    row_count,
603                    &self.metadata,
604                    &self.projection,
605                    &mut self.buffers,
606                )?;
607
608                let plan = plan_builder.build();
609
610                // if we have any cached results, connect them up
611                let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
612                    .with_parquet_metadata(&self.metadata);
613                let array_reader = if let Some(cache_info) = cache_info.as_ref() {
614                    let cache_options: CacheOptions = cache_info.builder().consumer();
615                    array_reader_builder
616                        .with_cache_options(Some(&cache_options))
617                        .build_array_reader(self.fields.as_deref(), &self.projection)
618                } else {
619                    array_reader_builder
620                        .build_array_reader(self.fields.as_deref(), &self.projection)
621                }?;
622
623                let reader = ParquetRecordBatchReader::new(array_reader, plan);
624                NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
625            }
626            RowGroupDecoderState::Finished => {
627                // nothing left to read
628                NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished)
629            }
630        };
631        Ok(result)
632    }
633
634    /// Which columns should be cached?
635    ///
636    /// Returns the columns that are used by the filters *and* then used in the
637    /// final projection, excluding any nested columns.
638    fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
639        let meta = self.metadata.row_group(row_group_idx);
640        match self.compute_cache_projection_inner(filter) {
641            Some(projection) => projection,
642            None => ProjectionMask::none(meta.columns().len()),
643        }
644    }
645
646    fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
647        // Do not compute the projection mask if the predicate cache is disabled
648        if self.max_predicate_cache_size == 0 {
649            return None;
650        }
651        let mut cache_projection = filter.predicates.first()?.projection().clone();
652        for predicate in filter.predicates.iter() {
653            cache_projection.union(predicate.projection());
654        }
655        cache_projection.intersect(&self.projection);
656        self.exclude_nested_columns_from_cache(&cache_projection)
657    }
658
659    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
660    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
661        mask.without_nested_types(self.metadata.file_metadata().schema_descr())
662    }
663
664    /// Get the offset index for the specified row group, if any
665    fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
666        self.metadata
667            .offset_index()
668            .filter(|index| !index.is_empty())
669            .and_then(|index| index.get(row_group_idx))
670            .map(|columns| columns.as_slice())
671    }
672}
673
674/// Override the selection strategy if needed.
675///
676/// Some pages can be skipped during row-group construction if they are not read
677/// by the selections. This means that the data pages for those rows are never
678/// loaded and definition/repetition levels are never read. When using
679/// `RowSelections` selection works because `skip_records()` handles this
680/// case and skips the page accordingly.
681///
682/// However, with the current mask design, all values must be read and decoded
683/// and then a mask filter is applied. Thus if any pages are skipped during
684/// row-group construction, the data pages are missing and cannot be decoded.
685///
686/// A simple example:
687/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
688/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01)
689///
690/// Using the row selection to skip(4), page2 won't be read at all, so in this
691/// case we can't decode all the rows and apply a mask. To correctly apply the
692/// bit mask, we need all 6 values be read, but page2 is not in memory.
693fn override_selector_strategy_if_needed(
694    plan_builder: ReadPlanBuilder,
695    projection_mask: &ProjectionMask,
696    offset_index: Option<&[OffsetIndexMetaData]>,
697) -> ReadPlanBuilder {
698    // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that
699    let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
700        return plan_builder;
701    };
702
703    let preferred_strategy = plan_builder.resolve_selection_strategy();
704
705    let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
706        && plan_builder.selection().is_some_and(|selection| {
707            selection.should_force_selectors(projection_mask, offset_index)
708        });
709
710    let resolved_strategy = if force_selectors {
711        RowSelectionStrategy::Selectors
712    } else {
713        preferred_strategy
714    };
715
716    // override the plan builder strategy with the resolved one
717    let new_policy = match resolved_strategy {
718        RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
719        RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
720    };
721
722    plan_builder.with_row_selection_policy(new_policy)
723}
724
725#[cfg(test)]
726mod tests {
727    use super::*;
728
729    #[test]
730    // Verify that the size of RowGroupDecoderState does not grow too large
731    fn test_structure_size() {
732        assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
733    }
734}