Skip to main content

parquet/arrow/push_decoder/reader_builder/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod data;
19mod filter;
20
21use crate::DecodeResult;
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
24use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
25use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
26use crate::arrow::arrow_reader::{
27    ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, Mutex};
43
44/// The current row group being read and the read plan
45#[derive(Debug)]
46struct RowGroupInfo {
47    row_group_idx: usize,
48    row_count: usize,
49    plan_builder: ReadPlanBuilder,
50}
51
52/// This is the inner state machine for reading a single row group.
53#[derive(Debug)]
54enum RowGroupDecoderState {
55    Start {
56        row_group_info: RowGroupInfo,
57    },
58    /// Planning filters, but haven't yet requested data to evaluate them
59    Filters {
60        row_group_info: RowGroupInfo,
61        /// Any previously read column chunk data from prior filters
62        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
63        filter_info: FilterInfo,
64    },
65    /// Needs data to evaluate current filter
66    WaitingOnFilterData {
67        row_group_info: RowGroupInfo,
68        filter_info: FilterInfo,
69        data_request: DataRequest,
70    },
71    /// Know what data to actually read, after all predicates
72    StartData {
73        row_group_info: RowGroupInfo,
74        /// Any previously read column chunk data from the filtering phase
75        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
76        /// Any cached filter results
77        cache_info: Option<CacheInfo>,
78    },
79    /// Needs data to proceed with reading the output
80    WaitingOnData {
81        row_group_info: RowGroupInfo,
82        data_request: DataRequest,
83        /// Any cached filter results
84        cache_info: Option<CacheInfo>,
85    },
86    /// Finished (or not yet started) reading this group
87    Finished,
88}
89
90/// Result of a state transition
91#[derive(Debug)]
92struct NextState {
93    next_state: RowGroupDecoderState,
94    /// result to return, if any
95    ///
96    /// * `Some`: the processing should stop and return the result
97    /// * `None`: processing should continue
98    result: Option<DecodeResult<ParquetRecordBatchReader>>,
99}
100
101impl NextState {
102    /// The next state with no result.
103    ///
104    /// This indicates processing should continue
105    fn again(next_state: RowGroupDecoderState) -> Self {
106        Self {
107            next_state,
108            result: None,
109        }
110    }
111
112    /// Create a NextState with a result that should be returned
113    fn result(
114        next_state: RowGroupDecoderState,
115        result: DecodeResult<ParquetRecordBatchReader>,
116    ) -> Self {
117        Self {
118            next_state,
119            result: Some(result),
120        }
121    }
122}
123
124/// Builder for [`ParquetRecordBatchReader`] for a single row group
125///
126/// This struct drives the main state machine for decoding each row group -- it
127/// determines what data is needed, and then assembles the
128/// `ParquetRecordBatchReader` when all data is available.
129#[derive(Debug)]
130pub(crate) struct RowGroupReaderBuilder {
131    /// The output batch size
132    batch_size: usize,
133
134    /// What columns to project (produce in each output batch)
135    projection: ProjectionMask,
136
137    /// The Parquet file metadata
138    metadata: Arc<ParquetMetaData>,
139
140    /// Top level parquet schema and arrow schema mapping
141    fields: Option<Arc<ParquetField>>,
142
143    /// Optional filter
144    filter: Option<RowFilter>,
145
146    /// Limit to apply to remaining row groups (decremented as rows are read)
147    limit: Option<usize>,
148
149    /// Offset to apply to remaining row groups (decremented as rows are read)
150    offset: Option<usize>,
151
152    /// The size in bytes of the predicate cache to use
153    ///
154    /// See [`RowGroupCache`] for details.
155    max_predicate_cache_size: usize,
156
157    /// The metrics collector
158    metrics: ArrowReaderMetrics,
159
160    /// Strategy for materialising row selections
161    row_selection_policy: RowSelectionPolicy,
162
163    /// Current state of the decoder.
164    ///
165    /// It is taken when processing, and must be put back before returning
166    /// it is a bug error if it is not put back after transitioning states.
167    state: Option<RowGroupDecoderState>,
168
169    /// The underlying data store
170    buffers: PushBuffers,
171}
172
173impl RowGroupReaderBuilder {
174    /// Create a new RowGroupReaderBuilder
175    #[expect(clippy::too_many_arguments)]
176    pub(crate) fn new(
177        batch_size: usize,
178        projection: ProjectionMask,
179        metadata: Arc<ParquetMetaData>,
180        fields: Option<Arc<ParquetField>>,
181        filter: Option<RowFilter>,
182        limit: Option<usize>,
183        offset: Option<usize>,
184        metrics: ArrowReaderMetrics,
185        max_predicate_cache_size: usize,
186        buffers: PushBuffers,
187        row_selection_policy: RowSelectionPolicy,
188    ) -> Self {
189        Self {
190            batch_size,
191            projection,
192            metadata,
193            fields,
194            filter,
195            limit,
196            offset,
197            metrics,
198            max_predicate_cache_size,
199            row_selection_policy,
200            state: Some(RowGroupDecoderState::Finished),
201            buffers,
202        }
203    }
204
205    /// Push new data buffers that can be used to satisfy pending requests
206    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
207        self.buffers.push_ranges(ranges, buffers);
208    }
209
210    /// Returns the total number of buffered bytes available
211    pub fn buffered_bytes(&self) -> u64 {
212        self.buffers.buffered_bytes()
213    }
214
215    /// take the current state, leaving None in its place.
216    ///
217    /// Returns an error if there the state wasn't put back after the previous
218    /// call to [`Self::take_state`].
219    ///
220    /// Any code that calls this method must ensure that the state is put back
221    /// before returning, otherwise the reader will error next time it is called
222    fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
223        self.state.take().ok_or_else(|| {
224            ParquetError::General(String::from(
225                "Internal Error: RowGroupReader in invalid state",
226            ))
227        })
228    }
229
230    /// Setup this reader to read the next row group
231    pub(crate) fn next_row_group(
232        &mut self,
233        row_group_idx: usize,
234        row_count: usize,
235        selection: Option<RowSelection>,
236    ) -> Result<(), ParquetError> {
237        let state = self.take_state()?;
238        if !matches!(state, RowGroupDecoderState::Finished) {
239            return Err(ParquetError::General(format!(
240                "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
241            )));
242        }
243        let plan_builder = ReadPlanBuilder::new(self.batch_size)
244            .with_selection(selection)
245            .with_row_selection_policy(self.row_selection_policy);
246
247        let row_group_info = RowGroupInfo {
248            row_group_idx,
249            row_count,
250            plan_builder,
251        };
252
253        self.state = Some(RowGroupDecoderState::Start { row_group_info });
254        Ok(())
255    }
256
257    /// Try to build the next `ParquetRecordBatchReader` from this RowGroupReader.
258    ///
259    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
260    /// ranges of data that are needed to proceed.
261    ///
262    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
263    /// `DecodeResult::Data`.
264    pub(crate) fn try_build(
265        &mut self,
266    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
267        loop {
268            let current_state = self.take_state()?;
269            // Try to transition the decoder.
270            match self.try_transition(current_state)? {
271                // Either produced a batch reader, needed input, or finished
272                NextState {
273                    next_state,
274                    result: Some(result),
275                } => {
276                    // put back the next state
277                    self.state = Some(next_state);
278                    return Ok(result);
279                }
280                // completed one internal state, maybe can proceed further
281                NextState {
282                    next_state,
283                    result: None,
284                } => {
285                    // continue processing
286                    self.state = Some(next_state);
287                }
288            }
289        }
290    }
291
292    /// Current state --> next state + optional output
293    ///
294    /// This is the main state transition function for the row group reader
295    /// and encodes the row group decoding state machine.
296    ///
297    /// # Notes
298    ///
299    /// This structure is used to reduce the indentation level of the main loop
300    /// in try_build
301    fn try_transition(
302        &mut self,
303        current_state: RowGroupDecoderState,
304    ) -> Result<NextState, ParquetError> {
305        let result = match current_state {
306            RowGroupDecoderState::Start { row_group_info } => {
307                let column_chunks = None; // no prior column chunks
308
309                let Some(filter) = self.filter.take() else {
310                    // no filter, start trying to read data immediately
311                    return Ok(NextState::again(RowGroupDecoderState::StartData {
312                        row_group_info,
313                        column_chunks,
314                        cache_info: None,
315                    }));
316                };
317                // no predicates in filter, so start reading immediately
318                if filter.predicates.is_empty() {
319                    return Ok(NextState::again(RowGroupDecoderState::StartData {
320                        row_group_info,
321                        column_chunks,
322                        cache_info: None,
323                    }));
324                };
325
326                // we have predicates to evaluate
327                let cache_projection =
328                    self.compute_cache_projection(row_group_info.row_group_idx, &filter);
329
330                let cache_info = CacheInfo::new(
331                    cache_projection,
332                    Arc::new(Mutex::new(RowGroupCache::new(
333                        self.batch_size,
334                        self.max_predicate_cache_size,
335                    ))),
336                );
337
338                let filter_info = FilterInfo::new(filter, cache_info);
339                NextState::again(RowGroupDecoderState::Filters {
340                    row_group_info,
341                    filter_info,
342                    column_chunks,
343                })
344            }
345            // need to evaluate filters
346            RowGroupDecoderState::Filters {
347                row_group_info,
348                column_chunks,
349                filter_info,
350            } => {
351                let RowGroupInfo {
352                    row_group_idx,
353                    row_count,
354                    plan_builder,
355                } = row_group_info;
356
357                // If nothing is selected, we are done with this row group
358                if !plan_builder.selects_any() {
359                    // ruled out entire row group
360                    self.filter = Some(filter_info.into_filter());
361                    return Ok(NextState::result(
362                        RowGroupDecoderState::Finished,
363                        DecodeResult::Finished,
364                    ));
365                }
366
367                // Make a request for the data needed to evaluate the current predicate
368                let predicate = filter_info.current();
369
370                // need to fetch pages the column needs for decoding, figure
371                // that out based on the current selection and projection
372                let data_request = DataRequestBuilder::new(
373                    row_group_idx,
374                    row_count,
375                    self.batch_size,
376                    &self.metadata,
377                    predicate.projection(), // use the predicate's projection
378                )
379                .with_selection(plan_builder.selection())
380                // Fetch predicate columns; expand selection only for cached predicate columns
381                .with_cache_projection(Some(filter_info.cache_projection()))
382                .with_column_chunks(column_chunks)
383                .build();
384
385                let row_group_info = RowGroupInfo {
386                    row_group_idx,
387                    row_count,
388                    plan_builder,
389                };
390
391                NextState::again(RowGroupDecoderState::WaitingOnFilterData {
392                    row_group_info,
393                    filter_info,
394                    data_request,
395                })
396            }
397            RowGroupDecoderState::WaitingOnFilterData {
398                row_group_info,
399                data_request,
400                mut filter_info,
401            } => {
402                // figure out what ranges we still need
403                let needed_ranges = data_request.needed_ranges(&self.buffers);
404                if !needed_ranges.is_empty() {
405                    // still need data
406                    return Ok(NextState::result(
407                        RowGroupDecoderState::WaitingOnFilterData {
408                            row_group_info,
409                            filter_info,
410                            data_request,
411                        },
412                        DecodeResult::NeedsData(needed_ranges),
413                    ));
414                }
415
416                // otherwise we have all the data we need to evaluate the predicate
417                let RowGroupInfo {
418                    row_group_idx,
419                    row_count,
420                    mut plan_builder,
421                } = row_group_info;
422
423                let predicate = filter_info.current();
424
425                let row_group = data_request.try_into_in_memory_row_group(
426                    row_group_idx,
427                    row_count,
428                    &self.metadata,
429                    predicate.projection(),
430                    &mut self.buffers,
431                )?;
432
433                let cache_options = filter_info.cache_builder().producer();
434
435                let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
436                    .with_cache_options(Some(&cache_options))
437                    .with_parquet_metadata(&self.metadata)
438                    .build_array_reader(self.fields.as_deref(), predicate.projection())?;
439
440                // Prepare to evaluate the filter.
441                // Note: first update the selection strategy to properly handle any pages
442                // pruned during fetch
443                plan_builder = override_selector_strategy_if_needed(
444                    plan_builder,
445                    predicate.projection(),
446                    self.row_group_offset_index(row_group_idx),
447                );
448                // `with_predicate` actually evaluates the filter
449
450                plan_builder =
451                    plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
452
453                let row_group_info = RowGroupInfo {
454                    row_group_idx,
455                    row_count,
456                    plan_builder,
457                };
458
459                // Take back the column chunks that were read
460                let column_chunks = Some(row_group.column_chunks);
461
462                // advance to the next predicate, if any
463                match filter_info.advance() {
464                    AdvanceResult::Continue(filter_info) => {
465                        NextState::again(RowGroupDecoderState::Filters {
466                            row_group_info,
467                            column_chunks,
468                            filter_info,
469                        })
470                    }
471                    // done with predicates, proceed to reading data
472                    AdvanceResult::Done(filter, cache_info) => {
473                        // remember we need to put back the filter
474                        assert!(self.filter.is_none());
475                        self.filter = Some(filter);
476                        NextState::again(RowGroupDecoderState::StartData {
477                            row_group_info,
478                            column_chunks,
479                            cache_info: Some(cache_info),
480                        })
481                    }
482                }
483            }
484            RowGroupDecoderState::StartData {
485                row_group_info,
486                column_chunks,
487                cache_info,
488            } => {
489                let RowGroupInfo {
490                    row_group_idx,
491                    row_count,
492                    plan_builder,
493                } = row_group_info;
494
495                // Compute the number of rows in the selection before applying limit and offset
496                let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
497
498                if rows_before == 0 {
499                    // ruled out entire row group
500                    return Ok(NextState::result(
501                        RowGroupDecoderState::Finished,
502                        DecodeResult::Finished,
503                    ));
504                }
505
506                // Apply any limit and offset
507                let mut plan_builder = plan_builder
508                    .limited(row_count)
509                    .with_offset(self.offset)
510                    .with_limit(self.limit)
511                    .build_limited();
512
513                let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
514
515                // Update running offset and limit for after the current row group is read
516                if let Some(offset) = &mut self.offset {
517                    // Reduction is either because of offset or limit, as limit is applied
518                    // after offset has been "exhausted" can just use saturating sub here
519                    *offset = offset.saturating_sub(rows_before - rows_after)
520                }
521
522                if rows_after == 0 {
523                    // no rows left after applying limit/offset
524                    return Ok(NextState::result(
525                        RowGroupDecoderState::Finished,
526                        DecodeResult::Finished,
527                    ));
528                }
529
530                if let Some(limit) = &mut self.limit {
531                    *limit -= rows_after;
532                }
533
534                let data_request = DataRequestBuilder::new(
535                    row_group_idx,
536                    row_count,
537                    self.batch_size,
538                    &self.metadata,
539                    &self.projection,
540                )
541                .with_selection(plan_builder.selection())
542                .with_column_chunks(column_chunks)
543                // Final projection fetch shouldn't expand selection for cache
544                // so don't call with_cache_projection here
545                .build();
546
547                plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
548
549                plan_builder = override_selector_strategy_if_needed(
550                    plan_builder,
551                    &self.projection,
552                    self.row_group_offset_index(row_group_idx),
553                );
554
555                let row_group_info = RowGroupInfo {
556                    row_group_idx,
557                    row_count,
558                    plan_builder,
559                };
560
561                NextState::again(RowGroupDecoderState::WaitingOnData {
562                    row_group_info,
563                    data_request,
564                    cache_info,
565                })
566            }
567            // Waiting on data to proceed with reading the output
568            RowGroupDecoderState::WaitingOnData {
569                row_group_info,
570                data_request,
571                cache_info,
572            } => {
573                let needed_ranges = data_request.needed_ranges(&self.buffers);
574                if !needed_ranges.is_empty() {
575                    // still need data
576                    return Ok(NextState::result(
577                        RowGroupDecoderState::WaitingOnData {
578                            row_group_info,
579                            data_request,
580                            cache_info,
581                        },
582                        DecodeResult::NeedsData(needed_ranges),
583                    ));
584                }
585
586                // otherwise we have all the data we need to proceed
587                let RowGroupInfo {
588                    row_group_idx,
589                    row_count,
590                    plan_builder,
591                } = row_group_info;
592
593                let row_group = data_request.try_into_in_memory_row_group(
594                    row_group_idx,
595                    row_count,
596                    &self.metadata,
597                    &self.projection,
598                    &mut self.buffers,
599                )?;
600
601                let plan = plan_builder.build();
602
603                // if we have any cached results, connect them up
604                let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
605                    .with_parquet_metadata(&self.metadata);
606                let array_reader = if let Some(cache_info) = cache_info.as_ref() {
607                    let cache_options = cache_info.builder().consumer();
608                    array_reader_builder
609                        .with_cache_options(Some(&cache_options))
610                        .build_array_reader(self.fields.as_deref(), &self.projection)
611                } else {
612                    array_reader_builder
613                        .build_array_reader(self.fields.as_deref(), &self.projection)
614                }?;
615
616                let reader = ParquetRecordBatchReader::new(array_reader, plan);
617                NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
618            }
619            RowGroupDecoderState::Finished => {
620                // nothing left to read
621                NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished)
622            }
623        };
624        Ok(result)
625    }
626
627    /// Which columns should be cached?
628    ///
629    /// Returns the columns that are used by the filters *and* then used in the
630    /// final projection, excluding any nested columns.
631    fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
632        let meta = self.metadata.row_group(row_group_idx);
633        match self.compute_cache_projection_inner(filter) {
634            Some(projection) => projection,
635            None => ProjectionMask::none(meta.columns().len()),
636        }
637    }
638
639    fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
640        // Do not compute the projection mask if the predicate cache is disabled
641        if self.max_predicate_cache_size == 0 {
642            return None;
643        }
644        let mut cache_projection = filter.predicates.first()?.projection().clone();
645        for predicate in filter.predicates.iter() {
646            cache_projection.union(predicate.projection());
647        }
648        cache_projection.intersect(&self.projection);
649        self.exclude_nested_columns_from_cache(&cache_projection)
650    }
651
652    /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
653    fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
654        mask.without_nested_types(self.metadata.file_metadata().schema_descr())
655    }
656
657    /// Get the offset index for the specified row group, if any
658    fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
659        self.metadata
660            .offset_index()
661            .filter(|index| !index.is_empty())
662            .and_then(|index| index.get(row_group_idx))
663            .map(|columns| columns.as_slice())
664    }
665}
666
667/// Override the selection strategy if needed.
668///
669/// Some pages can be skipped during row-group construction if they are not read
670/// by the selections. This means that the data pages for those rows are never
671/// loaded and definition/repetition levels are never read. When using
672/// `RowSelections` selection works because `skip_records()` handles this
673/// case and skips the page accordingly.
674///
675/// However, with the current mask design, all values must be read and decoded
676/// and then a mask filter is applied. Thus if any pages are skipped during
677/// row-group construction, the data pages are missing and cannot be decoded.
678///
679/// A simple example:
680/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
681/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01)
682///
683/// Using the row selection to skip(4), page2 won't be read at all, so in this
684/// case we can't decode all the rows and apply a mask. To correctly apply the
685/// bit mask, we need all 6 values be read, but page2 is not in memory.
686fn override_selector_strategy_if_needed(
687    plan_builder: ReadPlanBuilder,
688    projection_mask: &ProjectionMask,
689    offset_index: Option<&[OffsetIndexMetaData]>,
690) -> ReadPlanBuilder {
691    // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that
692    let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
693        return plan_builder;
694    };
695
696    let preferred_strategy = plan_builder.resolve_selection_strategy();
697
698    let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
699        && plan_builder.selection().is_some_and(|selection| {
700            selection.should_force_selectors(projection_mask, offset_index)
701        });
702
703    let resolved_strategy = if force_selectors {
704        RowSelectionStrategy::Selectors
705    } else {
706        preferred_strategy
707    };
708
709    // override the plan builder strategy with the resolved one
710    let new_policy = match resolved_strategy {
711        RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
712        RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
713    };
714
715    plan_builder.with_row_selection_policy(new_policy)
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721
722    #[test]
723    // Verify that the size of RowGroupDecoderState does not grow too large
724    fn test_structure_size() {
725        assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
726    }
727}