parquet/arrow/push_decoder/reader_builder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod data;
19mod filter;
20
21use crate::DecodeResult;
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
24use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
25use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
26use crate::arrow::arrow_reader::{
27    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, Mutex};
43
44/// The current row group being read and the read plan
45#[derive(Debug)]
46struct RowGroupInfo {
47    row_group_idx: usize,
48    row_count: usize,
49    plan_builder: ReadPlanBuilder,
50}
51
52/// This is the inner state machine for reading a single row group.
53#[derive(Debug)]
54enum RowGroupDecoderState {
55    Start {
56        row_group_info: RowGroupInfo,
57    },
58    /// Planning filters, but haven't yet requested data to evaluate them
59    Filters {
60        row_group_info: RowGroupInfo,
61        /// Any previously read column chunk data from prior filters
62        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
63        filter_info: FilterInfo,
64    },
65    /// Needs data to evaluate current filter
66    WaitingOnFilterData {
67        row_group_info: RowGroupInfo,
68        filter_info: FilterInfo,
69        data_request: DataRequest,
70    },
71    /// Know what data to actually read, after all predicates
72    StartData {
73        row_group_info: RowGroupInfo,
74        /// Any previously read column chunk data from the filtering phase
75        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
76        /// Any cached filter results
77        cache_info: Option<CacheInfo>,
78    },
79    /// Needs data to proceed with reading the output
80    WaitingOnData {
81        row_group_info: RowGroupInfo,
82        data_request: DataRequest,
83        /// Any cached filter results
84        cache_info: Option<CacheInfo>,
85    },
86    /// Finished (or not yet started) reading this group
87    Finished,
88}
89
90/// Result of a state transition
91#[derive(Debug)]
92struct NextState {
93    next_state: RowGroupDecoderState,
94    /// result to return, if any
95    ///
96    /// * `Some`: the processing should stop and return the result
97    /// * `None`: processing should continue
98    result: Option<DecodeResult<ParquetRecordBatchReader>>,
99}
100
101impl NextState {
102    /// The next state with no result.
103    ///
104    /// This indicates processing should continue
105    fn again(next_state: RowGroupDecoderState) -> Self {
106        Self {
107            next_state,
108            result: None,
109        }
110    }
111
112    /// Create a NextState with a result that should be returned
113    fn result(
114        next_state: RowGroupDecoderState,
115        result: DecodeResult<ParquetRecordBatchReader>,
116    ) -> Self {
117        Self {
118            next_state,
119            result: Some(result),
120        }
121    }
122}
123
124/// Builder for [`ParquetRecordBatchReader`] for a single row group
125///
126/// This struct drives the main state machine for decoding each row group -- it
127/// determines what data is needed, and then assembles the
128/// `ParquetRecordBatchReader` when all data is available.
129#[derive(Debug)]
130pub(crate) struct RowGroupReaderBuilder {
131    /// The output batch size
132    batch_size: usize,
133
134    /// What columns to project (produce in each output batch)
135    projection: ProjectionMask,
136
137    /// The Parquet file metadata
138    metadata: Arc<ParquetMetaData>,
139
140    /// Top level parquet schema and arrow schema mapping
141    fields: Option<Arc<ParquetField>>,
142
143    /// Optional filter
144    filter: Option<RowFilter>,
145
146    /// Limit to apply to remaining row groups (decremented as rows are read)
147    limit: Option<usize>,
148
149    /// Offset to apply to remaining row groups (decremented as rows are read)
150    offset: Option<usize>,
151
152    /// The size in bytes of the predicate cache to use
153    ///
154    /// See [`RowGroupCache`] for details.
155    max_predicate_cache_size: usize,
156
157    /// The metrics collector
158    metrics: ArrowReaderMetrics,
159
160    /// Strategy for materialising row selections
161    row_selection_policy: RowSelectionPolicy,
162
163    /// Current state of the decoder.
164    ///
165    /// It is taken when processing, and must be put back before returning
166    /// it is a bug error if it is not put back after transitioning states.
167    state: Option<RowGroupDecoderState>,
168
169    /// The underlying data store
170    buffers: PushBuffers,
171}
172
173impl RowGroupReaderBuilder {
174    /// Create a new RowGroupReaderBuilder
175    #[expect(clippy::too_many_arguments)]
176    pub(crate) fn new(
177        batch_size: usize,
178        projection: ProjectionMask,
179        metadata: Arc<ParquetMetaData>,
180        fields: Option<Arc<ParquetField>>,
181        filter: Option<RowFilter>,
182        limit: Option<usize>,
183        offset: Option<usize>,
184        metrics: ArrowReaderMetrics,
185        max_predicate_cache_size: usize,
186        buffers: PushBuffers,
187        row_selection_policy: RowSelectionPolicy,
188    ) -> Self {
189        Self {
190            batch_size,
191            projection,
192            metadata,
193            fields,
194            filter,
195            limit,
196            offset,
197            metrics,
198            max_predicate_cache_size,
199            row_selection_policy,
200            state: Some(RowGroupDecoderState::Finished),
201            buffers,
202        }
203    }
204
205    /// Push new data buffers that can be used to satisfy pending requests
206    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
207        self.buffers.push_ranges(ranges, buffers);
208    }
209
210    /// Returns the total number of buffered bytes available
211    pub fn buffered_bytes(&self) -> u64 {
212        self.buffers.buffered_bytes()
213    }
214
215    /// take the current state, leaving None in its place.
216    ///
217    /// Returns an error if there the state wasn't put back after the previous
218    /// call to [`Self::take_state`].
219    ///
220    /// Any code that calls this method must ensure that the state is put back
221    /// before returning, otherwise the reader will error next time it is called
222    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
223        self.state.take().ok_or_else(|| {
224            ParquetError::General(String::from(
225                "Internal Error: RowGroupReader in invalid state",
226            ))
227        })
228    }
229
230    /// Setup this reader to read the next row group
231    pub(crate) fn next_row_group(
232        &mut self,
233        row_group_idx: usize,
234        row_count: usize,
235        selection: Option<RowSelection>,
236    ) -> Result<(), ParquetError> {
237        let state = self.take_state()?;
238        if !matches!(state, RowGroupDecoderState::Finished) {
239            return Err(ParquetError::General(format!(
240                "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
241            )));
242        }
243        let plan_builder = ReadPlanBuilder::new(self.batch_size)
244            .with_selection(selection)
245            .with_row_selection_policy(self.row_selection_policy);
246
247        let row_group_info = RowGroupInfo {
248            row_group_idx,
249            row_count,
250            plan_builder,
251        };
252
253        self.state = Some(RowGroupDecoderState::Start { row_group_info });
254        Ok(())
255    }
256
257    /// Try to build the next `ParquetRecordBatchReader` from this RowGroupReader.
258    ///
259    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
260    /// ranges of data that are needed to proceed.
261    ///
262    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
263    /// `DecodeResult::Data`.
264    pub(crate) fn try_build(
265        &mut self,
266    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
267        loop {
268            let current_state = self.take_state()?;
269            // Try to transition the decoder.
270            match self.try_transition(current_state)? {
271                // Either produced a batch reader, needed input, or finished
272                NextState {
273                    next_state,
274                    result: Some(result),
275                } => {
276                    // put back the next state
277                    self.state = Some(next_state);
278                    return Ok(result);
279                }
280                // completed one internal state, maybe can proceed further
281                NextState {
282                    next_state,
283                    result: None,
284                } => {
285                    // continue processing
286                    self.state = Some(next_state);
287                }
288            }
289        }
290    }
291
292    /// Current state --> next state + optional output
293    ///
294    /// This is the main state transition function for the row group reader
295    /// and encodes the row group decoding state machine.
296    ///
297    /// # Notes
298    ///
299    /// This structure is used to reduce the indentation level of the main loop
300    /// in try_build
301    fn try_transition(
302        &mut self,
303        current_state: RowGroupDecoderState,
304    ) -> Result<NextState, ParquetError> {
305        let result = match current_state {
306            RowGroupDecoderState::Start { row_group_info } => {
307                let column_chunks = None; // no prior column chunks
308
309                let Some(filter) = self.filter.take() else {
310                    // no filter, start trying to read data immediately
311                    return Ok(NextState::again(RowGroupDecoderState::StartData {
312                        row_group_info,
313                        column_chunks,
314                        cache_info: None,
315                    }));
316                };
317                // no predicates in filter, so start reading immediately
318                if filter.predicates.is_empty() {
319                    return Ok(NextState::again(RowGroupDecoderState::StartData {
320                        row_group_info,
321                        column_chunks,
322                        cache_info: None,
323                    }));
324                };
325
326                // we have predicates to evaluate
327                let cache_projection =
328                    self.compute_cache_projection(row_group_info.row_group_idx, &filter);
329
330                let cache_info = CacheInfo::new(
331                    cache_projection,
332                    Arc::new(Mutex::new(RowGroupCache::new(
333                        self.batch_size,
334                        self.max_predicate_cache_size,
335                    ))),
336                );
337
338                let filter_info = FilterInfo::new(filter, cache_info);
339                NextState::again(RowGroupDecoderState::Filters {
340                    row_group_info,
341                    filter_info,
342                    column_chunks,
343                })
344            }
345            // need to evaluate filters
346            RowGroupDecoderState::Filters {
347                row_group_info,
348                column_chunks,
349                filter_info,
350            } => {
351                let RowGroupInfo {
352                    row_group_idx,
353                    row_count,
354                    plan_builder,
355                } = row_group_info;
356
357                // If nothing is selected, we are done with this row group
358                if !plan_builder.selects_any() {
359                    // ruled out entire row group
360                    self.filter = Some(filter_info.into_filter());
361                    return Ok(NextState::result(
362                        RowGroupDecoderState::Finished,
363                        DecodeResult::Finished,
364                    ));
365                }
366
367                // Make a request for the data needed to evaluate the current predicate
368                let predicate = filter_info.current();
369
370                // need to fetch pages the column needs for decoding, figure
371                // that out based on the current selection and projection
372                let data_request = DataRequestBuilder::new(
373                    row_group_idx,
374                    row_count,
375                    self.batch_size,
376                    &self.metadata,
377                    predicate.projection(), // use the predicate's projection
378                )
379                .with_selection(plan_builder.selection())
380                // Fetch predicate columns; expand selection only for cached predicate columns
381                .with_cache_projection(Some(filter_info.cache_projection()))
382                .with_column_chunks(column_chunks)
383                .build();
384
385                let row_group_info = RowGroupInfo {
386                    row_group_idx,
387                    row_count,
388                    plan_builder,
389                };
390
391                NextState::again(RowGroupDecoderState::WaitingOnFilterData {
392                    row_group_info,
393                    filter_info,
394                    data_request,
395                })
396            }
397            RowGroupDecoderState::WaitingOnFilterData {
398                row_group_info,
399                data_request,
400                mut filter_info,
401            } => {
402                // figure out what ranges we still need
403                let needed_ranges = data_request.needed_ranges(&self.buffers);
404                if !needed_ranges.is_empty() {
405                    // still need data
406                    return Ok(NextState::result(
407                        RowGroupDecoderState::WaitingOnFilterData {
408                            row_group_info,
409                            filter_info,
410                            data_request,
411                        },
412                        DecodeResult::NeedsData(needed_ranges),
413                    ));
414                }
415
416                // otherwise we have all the data we need to evaluate the predicate
417                let RowGroupInfo {
418                    row_group_idx,
419                    row_count,
420                    mut plan_builder,
421                } = row_group_info;
422
423                let predicate = filter_info.current();
424
425                let row_group = data_request.try_into_in_memory_row_group(
426                    row_group_idx,
427                    row_count,
428                    &self.metadata,
429                    predicate.projection(),
430                    &mut self.buffers,
431                )?;
432
433                let cache_options = filter_info.cache_builder().producer();
434
435                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
436                    .with_cache_options(Some(&cache_options))
437                    .with_parquet_metadata(&self.metadata)
438                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
439
440                plan_builder =
441                    plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
442
443                let row_group_info = RowGroupInfo {
444                    row_group_idx,
445                    row_count,
446                    plan_builder,
447                };
448
449                // Take back the column chunks that were read
450                let column_chunks = Some(row_group.column_chunks);
451
452                // advance to the next predicate, if any
453                match filter_info.advance() {
454                    AdvanceResult::Continue(filter_info) => {
455                        NextState::again(RowGroupDecoderState::Filters {
456                            row_group_info,
457                            column_chunks,
458                            filter_info,
459                        })
460                    }
461                    // done with predicates, proceed to reading data
462                    AdvanceResult::Done(filter, cache_info) => {
463                        // remember we need to put back the filter
464                        assert!(self.filter.is_none());
465                        self.filter = Some(filter);
466                        NextState::again(RowGroupDecoderState::StartData {
467                            row_group_info,
468                            column_chunks,
469                            cache_info: Some(cache_info),
470                        })
471                    }
472                }
473            }
474            RowGroupDecoderState::StartData {
475                row_group_info,
476                column_chunks,
477                cache_info,
478            } => {
479                let RowGroupInfo {
480                    row_group_idx,
481                    row_count,
482                    plan_builder,
483                } = row_group_info;
484
485                // Compute the number of rows in the selection before applying limit and offset
486                let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
487
488                if rows_before == 0 {
489                    // ruled out entire row group
490                    return Ok(NextState::result(
491                        RowGroupDecoderState::Finished,
492                        DecodeResult::Finished,
493                    ));
494                }
495
496                // Apply any limit and offset
497                let mut plan_builder = plan_builder
498                    .limited(row_count)
499                    .with_offset(self.offset)
500                    .with_limit(self.limit)
501                    .build_limited();
502
503                let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
504
505                // Update running offset and limit for after the current row group is read
506                if let Some(offset) = &mut self.offset {
507                    // Reduction is either because of offset or limit, as limit is applied
508                    // after offset has been "exhausted" can just use saturating sub here
509                    *offset = offset.saturating_sub(rows_before - rows_after)
510                }
511
512                if rows_after == 0 {
513                    // no rows left after applying limit/offset
514                    return Ok(NextState::result(
515                        RowGroupDecoderState::Finished,
516                        DecodeResult::Finished,
517                    ));
518                }
519
520                if let Some(limit) = &mut self.limit {
521                    *limit -= rows_after;
522                }
523
524                let data_request = DataRequestBuilder::new(
525                    row_group_idx,
526                    row_count,
527                    self.batch_size,
528                    &self.metadata,
529                    &self.projection,
530                )
531                .with_selection(plan_builder.selection())
532                .with_column_chunks(column_chunks)
533                // Final projection fetch shouldn't expand selection for cache
534                // so don't call with_cache_projection here
535                .build();
536
537                plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
538
539                plan_builder = override_selector_strategy_if_needed(
540                    plan_builder,
541                    &self.projection,
542                    self.row_group_offset_index(row_group_idx),
543                );
544
545                let row_group_info = RowGroupInfo {
546                    row_group_idx,
547                    row_count,
548                    plan_builder,
549                };
550
551                NextState::again(RowGroupDecoderState::WaitingOnData {
552                    row_group_info,
553                    data_request,
554                    cache_info,
555                })
556            }
557            // Waiting on data to proceed with reading the output
558            RowGroupDecoderState::WaitingOnData {
559                row_group_info,
560                data_request,
561                cache_info,
562            } => {
563                let needed_ranges = data_request.needed_ranges(&self.buffers);
564                if !needed_ranges.is_empty() {
565                    // still need data
566                    return Ok(NextState::result(
567                        RowGroupDecoderState::WaitingOnData {
568                            row_group_info,
569                            data_request,
570                            cache_info,
571                        },
572                        DecodeResult::NeedsData(needed_ranges),
573                    ));
574                }
575
576                // otherwise we have all the data we need to proceed
577                let RowGroupInfo {
578                    row_group_idx,
579                    row_count,
580                    plan_builder,
581                } = row_group_info;
582
583                let row_group = data_request.try_into_in_memory_row_group(
584                    row_group_idx,
585                    row_count,
586                    &self.metadata,
587                    &self.projection,
588                    &mut self.buffers,
589                )?;
590
591                let plan = plan_builder.build();
592
593                // if we have any cached results, connect them up
594                let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
595                    .with_parquet_metadata(&self.metadata);
596                let array_reader = if let Some(cache_info) = cache_info.as_ref() {
597                    let cache_options = cache_info.builder().consumer();
598                    array_reader_builder
599                        .with_cache_options(Some(&cache_options))
600                        .build_array_reader(self.fields.as_deref(), &self.projection)
601                } else {
602                    array_reader_builder
603                        .build_array_reader(self.fields.as_deref(), &self.projection)
604                }?;
605
606                let reader = ParquetRecordBatchReader::new(array_reader, plan);
607                NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
608            }
609            RowGroupDecoderState::Finished => {
610                // nothing left to read
611                NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished)
612            }
613        };
614        Ok(result)
615    }
616
617    /// Which columns should be cached?
618    ///
619    /// Returns the columns that are used by the filters *and* then used in the
620    /// final projection, excluding any nested columns.
621    fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
622        let meta = self.metadata.row_group(row_group_idx);
623        match self.compute_cache_projection_inner(filter) {
624            Some(projection) => projection,
625            None => ProjectionMask::none(meta.columns().len()),
626        }
627    }
628
629    fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
630        // Do not compute the projection mask if the predicate cache is disabled
631        if self.max_predicate_cache_size == 0 {
632            return None;
633        }
634        let mut cache_projection = filter.predicates.first()?.projection().clone();
635        for predicate in filter.predicates.iter() {
636            cache_projection.union(predicate.projection());
637        }
638        cache_projection.intersect(&self.projection);
639        self.exclude_nested_columns_from_cache(&cache_projection)
640    }
641
642    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
643    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
644        mask.without_nested_types(self.metadata.file_metadata().schema_descr())
645    }
646
647    /// Get the offset index for the specified row group, if any
648    fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
649        self.metadata
650            .offset_index()
651            .filter(|index| !index.is_empty())
652            .and_then(|index| index.get(row_group_idx))
653            .map(|columns| columns.as_slice())
654    }
655}
656
657/// Override the selection strategy if needed.
658///
659/// Some pages can be skipped during row-group construction if they are not read
660/// by the selections. This means that the data pages for those rows are never
661/// loaded and definition/repetition levels are never read. When using
662/// `RowSelections` selection works because `skip_records()` handles this
663/// case and skips the page accordingly.
664///
665/// However, with the current mask design, all values must be read and decoded
666/// and then a mask filter is applied. Thus if any pages are skipped during
667/// row-group construction, the data pages are missing and cannot be decoded.
668///
669/// A simple example:
670/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
671/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01)
672///
673/// Using the row selection to skip(4), page2 won't be read at all, so in this
674/// case we can't decode all the rows and apply a mask. To correctly apply the
675/// bit mask, we need all 6 values be read, but page2 is not in memory.
676fn override_selector_strategy_if_needed(
677    plan_builder: ReadPlanBuilder,
678    projection_mask: &ProjectionMask,
679    offset_index: Option<&[OffsetIndexMetaData]>,
680) -> ReadPlanBuilder {
681    // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that
682    let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
683        return plan_builder;
684    };
685
686    let preferred_strategy = plan_builder.resolve_selection_strategy();
687
688    let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
689        && plan_builder.selection().is_some_and(|selection| {
690            selection.should_force_selectors(projection_mask, offset_index)
691        });
692
693    let resolved_strategy = if force_selectors {
694        RowSelectionStrategy::Selectors
695    } else {
696        preferred_strategy
697    };
698
699    // override the plan builder strategy with the resolved one
700    let new_policy = match resolved_strategy {
701        RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
702        RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
703    };
704
705    plan_builder.with_row_selection_policy(new_policy)
706}
707
708#[cfg(test)]
709mod tests {
710    use super::*;
711
712    #[test]
713    // Verify that the size of RowGroupDecoderState does not grow too large
714    fn test_structure_size() {
715        assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
716    }
717}