parquet/arrow/push_decoder/reader_builder/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod data;
19mod filter;
20
21use crate::DecodeResult;
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache};
24use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
25use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
26use crate::arrow::arrow_reader::{
27 ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, RwLock};
43
44/// The current row group being read and the read plan
45#[derive(Debug)]
46struct RowGroupInfo {
47 row_group_idx: usize,
48 row_count: usize,
49 plan_builder: ReadPlanBuilder,
50}
51
52/// This is the inner state machine for reading a single row group.
53#[derive(Debug)]
54enum RowGroupDecoderState {
55 Start {
56 row_group_info: RowGroupInfo,
57 },
58 /// Planning filters, but haven't yet requested data to evaluate them
59 Filters {
60 row_group_info: RowGroupInfo,
61 /// Any previously read column chunk data from prior filters
62 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
63 filter_info: FilterInfo,
64 },
65 /// Needs data to evaluate current filter
66 WaitingOnFilterData {
67 row_group_info: RowGroupInfo,
68 filter_info: FilterInfo,
69 data_request: DataRequest,
70 },
71 /// Know what data to actually read, after all predicates
72 StartData {
73 row_group_info: RowGroupInfo,
74 /// Any previously read column chunk data from the filtering phase
75 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
76 /// Any cached filter results
77 cache_info: Option<CacheInfo>,
78 },
79 /// Needs data to proceed with reading the output
80 WaitingOnData {
81 row_group_info: RowGroupInfo,
82 data_request: DataRequest,
83 /// Any cached filter results
84 cache_info: Option<CacheInfo>,
85 },
86 /// Finished (or not yet started) reading this group
87 Finished,
88}
89
90/// Result of a state transition
91#[derive(Debug)]
92struct NextState {
93 next_state: RowGroupDecoderState,
94 /// result to return, if any
95 ///
96 /// * `Some`: the processing should stop and return the result
97 /// * `None`: processing should continue
98 result: Option<DecodeResult<ParquetRecordBatchReader>>,
99}
100
101impl NextState {
102 /// The next state with no result.
103 ///
104 /// This indicates processing should continue
105 fn again(next_state: RowGroupDecoderState) -> Self {
106 Self {
107 next_state,
108 result: None,
109 }
110 }
111
112 /// Create a NextState with a result that should be returned
113 fn result(
114 next_state: RowGroupDecoderState,
115 result: DecodeResult<ParquetRecordBatchReader>,
116 ) -> Self {
117 Self {
118 next_state,
119 result: Some(result),
120 }
121 }
122}
123
124/// Builder for [`ParquetRecordBatchReader`] for a single row group
125///
126/// This struct drives the main state machine for decoding each row group -- it
127/// determines what data is needed, and then assembles the
128/// `ParquetRecordBatchReader` when all data is available.
129#[derive(Debug)]
130pub(crate) struct RowGroupReaderBuilder {
131 /// The output batch size
132 batch_size: usize,
133
134 /// What columns to project (produce in each output batch)
135 projection: ProjectionMask,
136
137 /// The Parquet file metadata
138 metadata: Arc<ParquetMetaData>,
139
140 /// Top level parquet schema and arrow schema mapping
141 fields: Option<Arc<ParquetField>>,
142
143 /// Optional filter
144 filter: Option<RowFilter>,
145
146 /// Limit to apply to remaining row groups (decremented as rows are read)
147 limit: Option<usize>,
148
149 /// Offset to apply to remaining row groups (decremented as rows are read)
150 offset: Option<usize>,
151
152 /// The size in bytes of the predicate cache to use
153 ///
154 /// See [`RowGroupCache`] for details.
155 max_predicate_cache_size: usize,
156
157 /// The metrics collector
158 metrics: ArrowReaderMetrics,
159
160 /// Strategy for materialising row selections
161 row_selection_policy: RowSelectionPolicy,
162
163 /// Current state of the decoder.
164 ///
165 /// It is taken when processing, and must be put back before returning
166 /// it is a bug error if it is not put back after transitioning states.
167 state: Option<RowGroupDecoderState>,
168
169 /// The underlying data store
170 buffers: PushBuffers,
171}
172
173impl RowGroupReaderBuilder {
174 /// Create a new RowGroupReaderBuilder
175 #[expect(clippy::too_many_arguments)]
176 pub(crate) fn new(
177 batch_size: usize,
178 projection: ProjectionMask,
179 metadata: Arc<ParquetMetaData>,
180 fields: Option<Arc<ParquetField>>,
181 filter: Option<RowFilter>,
182 limit: Option<usize>,
183 offset: Option<usize>,
184 metrics: ArrowReaderMetrics,
185 max_predicate_cache_size: usize,
186 buffers: PushBuffers,
187 row_selection_policy: RowSelectionPolicy,
188 ) -> Self {
189 Self {
190 batch_size,
191 projection,
192 metadata,
193 fields,
194 filter,
195 limit,
196 offset,
197 metrics,
198 max_predicate_cache_size,
199 row_selection_policy,
200 state: Some(RowGroupDecoderState::Finished),
201 buffers,
202 }
203 }
204
205 /// Push new data buffers that can be used to satisfy pending requests
206 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
207 self.buffers.push_ranges(ranges, buffers);
208 }
209
210 /// Returns the total number of buffered bytes available
211 pub fn buffered_bytes(&self) -> u64 {
212 self.buffers.buffered_bytes()
213 }
214
215 /// take the current state, leaving None in its place.
216 ///
217 /// Returns an error if there the state wasn't put back after the previous
218 /// call to [`Self::take_state`].
219 ///
220 /// Any code that calls this method must ensure that the state is put back
221 /// before returning, otherwise the reader will error next time it is called
222 fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
223 self.state.take().ok_or_else(|| {
224 ParquetError::General(String::from(
225 "Internal Error: RowGroupReader in invalid state",
226 ))
227 })
228 }
229
230 /// Setup this reader to read the next row group
231 pub(crate) fn next_row_group(
232 &mut self,
233 row_group_idx: usize,
234 row_count: usize,
235 selection: Option<RowSelection>,
236 ) -> Result<(), ParquetError> {
237 let state = self.take_state()?;
238 if !matches!(state, RowGroupDecoderState::Finished) {
239 return Err(ParquetError::General(format!(
240 "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
241 )));
242 }
243 let plan_builder = ReadPlanBuilder::new(self.batch_size)
244 .with_selection(selection)
245 .with_row_selection_policy(self.row_selection_policy);
246
247 let row_group_info = RowGroupInfo {
248 row_group_idx,
249 row_count,
250 plan_builder,
251 };
252
253 self.state = Some(RowGroupDecoderState::Start { row_group_info });
254 Ok(())
255 }
256
257 /// Try to build the next `ParquetRecordBatchReader` from this RowGroupReader.
258 ///
259 /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
260 /// ranges of data that are needed to proceed.
261 ///
262 /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
263 /// `DecodeResult::Data`.
264 pub(crate) fn try_build(
265 &mut self,
266 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
267 loop {
268 let current_state = self.take_state()?;
269 // Try to transition the decoder.
270 match self.try_transition(current_state)? {
271 // Either produced a batch reader, needed input, or finished
272 NextState {
273 next_state,
274 result: Some(result),
275 } => {
276 // put back the next state
277 self.state = Some(next_state);
278 return Ok(result);
279 }
280 // completed one internal state, maybe can proceed further
281 NextState {
282 next_state,
283 result: None,
284 } => {
285 // continue processing
286 self.state = Some(next_state);
287 }
288 }
289 }
290 }
291
292 /// Current state --> next state + optional output
293 ///
294 /// This is the main state transition function for the row group reader
295 /// and encodes the row group decoding state machine.
296 ///
297 /// # Notes
298 ///
299 /// This structure is used to reduce the indentation level of the main loop
300 /// in try_build
301 fn try_transition(
302 &mut self,
303 current_state: RowGroupDecoderState,
304 ) -> Result<NextState, ParquetError> {
305 let result = match current_state {
306 RowGroupDecoderState::Start { row_group_info } => {
307 let column_chunks = None; // no prior column chunks
308
309 let Some(filter) = self.filter.take() else {
310 // no filter, start trying to read data immediately
311 return Ok(NextState::again(RowGroupDecoderState::StartData {
312 row_group_info,
313 column_chunks,
314 cache_info: None,
315 }));
316 };
317 // no predicates in filter, so start reading immediately
318 if filter.predicates.is_empty() {
319 return Ok(NextState::again(RowGroupDecoderState::StartData {
320 row_group_info,
321 column_chunks,
322 cache_info: None,
323 }));
324 };
325
326 // we have predicates to evaluate
327 let cache_projection =
328 self.compute_cache_projection(row_group_info.row_group_idx, &filter);
329
330 let cache_info = CacheInfo::new(
331 cache_projection,
332 Arc::new(RwLock::new(RowGroupCache::new(
333 self.batch_size,
334 self.max_predicate_cache_size,
335 ))),
336 );
337
338 let filter_info = FilterInfo::new(filter, cache_info);
339 NextState::again(RowGroupDecoderState::Filters {
340 row_group_info,
341 filter_info,
342 column_chunks,
343 })
344 }
345 // need to evaluate filters
346 RowGroupDecoderState::Filters {
347 row_group_info,
348 column_chunks,
349 filter_info,
350 } => {
351 let RowGroupInfo {
352 row_group_idx,
353 row_count,
354 plan_builder,
355 } = row_group_info;
356
357 // If nothing is selected, we are done with this row group
358 if !plan_builder.selects_any() {
359 // ruled out entire row group
360 self.filter = Some(filter_info.into_filter());
361 return Ok(NextState::result(
362 RowGroupDecoderState::Finished,
363 DecodeResult::Finished,
364 ));
365 }
366
367 // Make a request for the data needed to evaluate the current predicate
368 let predicate = filter_info.current();
369
370 // need to fetch pages the column needs for decoding, figure
371 // that out based on the current selection and projection
372 let data_request = DataRequestBuilder::new(
373 row_group_idx,
374 row_count,
375 self.batch_size,
376 &self.metadata,
377 predicate.projection(), // use the predicate's projection
378 )
379 .with_selection(plan_builder.selection())
380 // Fetch predicate columns; expand selection only for cached predicate columns
381 .with_cache_projection(Some(filter_info.cache_projection()))
382 .with_column_chunks(column_chunks)
383 .build();
384
385 let row_group_info = RowGroupInfo {
386 row_group_idx,
387 row_count,
388 plan_builder,
389 };
390
391 NextState::again(RowGroupDecoderState::WaitingOnFilterData {
392 row_group_info,
393 filter_info,
394 data_request,
395 })
396 }
397 RowGroupDecoderState::WaitingOnFilterData {
398 row_group_info,
399 data_request,
400 mut filter_info,
401 } => {
402 // figure out what ranges we still need
403 let needed_ranges = data_request.needed_ranges(&self.buffers);
404 if !needed_ranges.is_empty() {
405 // still need data
406 return Ok(NextState::result(
407 RowGroupDecoderState::WaitingOnFilterData {
408 row_group_info,
409 filter_info,
410 data_request,
411 },
412 DecodeResult::NeedsData(needed_ranges),
413 ));
414 }
415
416 // otherwise we have all the data we need to evaluate the predicate
417 let RowGroupInfo {
418 row_group_idx,
419 row_count,
420 mut plan_builder,
421 } = row_group_info;
422
423 let predicate = filter_info.current();
424
425 let row_group = data_request.try_into_in_memory_row_group(
426 row_group_idx,
427 row_count,
428 &self.metadata,
429 predicate.projection(),
430 &mut self.buffers,
431 )?;
432
433 let cache_options = filter_info.cache_builder().producer();
434
435 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
436 .with_cache_options(Some(&cache_options))
437 .with_parquet_metadata(&self.metadata)
438 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
439
440 // Reset to original policy before each predicate so the override
441 // can detect page skipping for THIS predicate's columns.
442 // Without this reset, a prior predicate's override (e.g. Mask)
443 // carries forward and the check returns early, missing unfetched
444 // pages for subsequent predicates.
445 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
446
447 // Prepare to evaluate the filter.
448 // Note: first update the selection strategy to properly handle any pages
449 // pruned during fetch
450 plan_builder = override_selector_strategy_if_needed(
451 plan_builder,
452 predicate.projection(),
453 self.row_group_offset_index(row_group_idx),
454 );
455 // `with_predicate` actually evaluates the filter
456
457 plan_builder =
458 plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
459
460 let row_group_info = RowGroupInfo {
461 row_group_idx,
462 row_count,
463 plan_builder,
464 };
465
466 // Take back the column chunks that were read
467 let column_chunks = Some(row_group.column_chunks);
468
469 // advance to the next predicate, if any
470 match filter_info.advance() {
471 AdvanceResult::Continue(filter_info) => {
472 NextState::again(RowGroupDecoderState::Filters {
473 row_group_info,
474 column_chunks,
475 filter_info,
476 })
477 }
478 // done with predicates, proceed to reading data
479 AdvanceResult::Done(filter, cache_info) => {
480 // remember we need to put back the filter
481 assert!(self.filter.is_none());
482 self.filter = Some(filter);
483 NextState::again(RowGroupDecoderState::StartData {
484 row_group_info,
485 column_chunks,
486 cache_info: Some(cache_info),
487 })
488 }
489 }
490 }
491 RowGroupDecoderState::StartData {
492 row_group_info,
493 column_chunks,
494 cache_info,
495 } => {
496 let RowGroupInfo {
497 row_group_idx,
498 row_count,
499 plan_builder,
500 } = row_group_info;
501
502 // Compute the number of rows in the selection before applying limit and offset
503 let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
504
505 if rows_before == 0 {
506 // ruled out entire row group
507 return Ok(NextState::result(
508 RowGroupDecoderState::Finished,
509 DecodeResult::Finished,
510 ));
511 }
512
513 // Apply any limit and offset
514 let mut plan_builder = plan_builder
515 .limited(row_count)
516 .with_offset(self.offset)
517 .with_limit(self.limit)
518 .build_limited();
519
520 let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
521
522 // Update running offset and limit for after the current row group is read
523 if let Some(offset) = &mut self.offset {
524 // Reduction is either because of offset or limit, as limit is applied
525 // after offset has been "exhausted" can just use saturating sub here
526 *offset = offset.saturating_sub(rows_before - rows_after)
527 }
528
529 if rows_after == 0 {
530 // no rows left after applying limit/offset
531 return Ok(NextState::result(
532 RowGroupDecoderState::Finished,
533 DecodeResult::Finished,
534 ));
535 }
536
537 if let Some(limit) = &mut self.limit {
538 *limit -= rows_after;
539 }
540
541 let data_request = DataRequestBuilder::new(
542 row_group_idx,
543 row_count,
544 self.batch_size,
545 &self.metadata,
546 &self.projection,
547 )
548 .with_selection(plan_builder.selection())
549 .with_column_chunks(column_chunks)
550 // Final projection fetch shouldn't expand selection for cache
551 // so don't call with_cache_projection here
552 .build();
553
554 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
555
556 plan_builder = override_selector_strategy_if_needed(
557 plan_builder,
558 &self.projection,
559 self.row_group_offset_index(row_group_idx),
560 );
561
562 let row_group_info = RowGroupInfo {
563 row_group_idx,
564 row_count,
565 plan_builder,
566 };
567
568 NextState::again(RowGroupDecoderState::WaitingOnData {
569 row_group_info,
570 data_request,
571 cache_info,
572 })
573 }
574 // Waiting on data to proceed with reading the output
575 RowGroupDecoderState::WaitingOnData {
576 row_group_info,
577 data_request,
578 cache_info,
579 } => {
580 let needed_ranges = data_request.needed_ranges(&self.buffers);
581 if !needed_ranges.is_empty() {
582 // still need data
583 return Ok(NextState::result(
584 RowGroupDecoderState::WaitingOnData {
585 row_group_info,
586 data_request,
587 cache_info,
588 },
589 DecodeResult::NeedsData(needed_ranges),
590 ));
591 }
592
593 // otherwise we have all the data we need to proceed
594 let RowGroupInfo {
595 row_group_idx,
596 row_count,
597 plan_builder,
598 } = row_group_info;
599
600 let row_group = data_request.try_into_in_memory_row_group(
601 row_group_idx,
602 row_count,
603 &self.metadata,
604 &self.projection,
605 &mut self.buffers,
606 )?;
607
608 let plan = plan_builder.build();
609
610 // if we have any cached results, connect them up
611 let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
612 .with_parquet_metadata(&self.metadata);
613 let array_reader = if let Some(cache_info) = cache_info.as_ref() {
614 let cache_options: CacheOptions = cache_info.builder().consumer();
615 array_reader_builder
616 .with_cache_options(Some(&cache_options))
617 .build_array_reader(self.fields.as_deref(), &self.projection)
618 } else {
619 array_reader_builder
620 .build_array_reader(self.fields.as_deref(), &self.projection)
621 }?;
622
623 let reader = ParquetRecordBatchReader::new(array_reader, plan);
624 NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
625 }
626 RowGroupDecoderState::Finished => {
627 // nothing left to read
628 NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished)
629 }
630 };
631 Ok(result)
632 }
633
634 /// Which columns should be cached?
635 ///
636 /// Returns the columns that are used by the filters *and* then used in the
637 /// final projection, excluding any nested columns.
638 fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
639 let meta = self.metadata.row_group(row_group_idx);
640 match self.compute_cache_projection_inner(filter) {
641 Some(projection) => projection,
642 None => ProjectionMask::none(meta.columns().len()),
643 }
644 }
645
646 fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
647 // Do not compute the projection mask if the predicate cache is disabled
648 if self.max_predicate_cache_size == 0 {
649 return None;
650 }
651 let mut cache_projection = filter.predicates.first()?.projection().clone();
652 for predicate in filter.predicates.iter() {
653 cache_projection.union(predicate.projection());
654 }
655 cache_projection.intersect(&self.projection);
656 self.exclude_nested_columns_from_cache(&cache_projection)
657 }
658
659 /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
660 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
661 mask.without_nested_types(self.metadata.file_metadata().schema_descr())
662 }
663
664 /// Get the offset index for the specified row group, if any
665 fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
666 self.metadata
667 .offset_index()
668 .filter(|index| !index.is_empty())
669 .and_then(|index| index.get(row_group_idx))
670 .map(|columns| columns.as_slice())
671 }
672}
673
674/// Override the selection strategy if needed.
675///
676/// Some pages can be skipped during row-group construction if they are not read
677/// by the selections. This means that the data pages for those rows are never
678/// loaded and definition/repetition levels are never read. When using
679/// `RowSelections` selection works because `skip_records()` handles this
680/// case and skips the page accordingly.
681///
682/// However, with the current mask design, all values must be read and decoded
683/// and then a mask filter is applied. Thus if any pages are skipped during
684/// row-group construction, the data pages are missing and cannot be decoded.
685///
686/// A simple example:
687/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
688/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01)
689///
690/// Using the row selection to skip(4), page2 won't be read at all, so in this
691/// case we can't decode all the rows and apply a mask. To correctly apply the
692/// bit mask, we need all 6 values be read, but page2 is not in memory.
693fn override_selector_strategy_if_needed(
694 plan_builder: ReadPlanBuilder,
695 projection_mask: &ProjectionMask,
696 offset_index: Option<&[OffsetIndexMetaData]>,
697) -> ReadPlanBuilder {
698 // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that
699 let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
700 return plan_builder;
701 };
702
703 let preferred_strategy = plan_builder.resolve_selection_strategy();
704
705 let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
706 && plan_builder.selection().is_some_and(|selection| {
707 selection.should_force_selectors(projection_mask, offset_index)
708 });
709
710 let resolved_strategy = if force_selectors {
711 RowSelectionStrategy::Selectors
712 } else {
713 preferred_strategy
714 };
715
716 // override the plan builder strategy with the resolved one
717 let new_policy = match resolved_strategy {
718 RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
719 RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
720 };
721
722 plan_builder.with_row_selection_policy(new_policy)
723}
724
725#[cfg(test)]
726mod tests {
727 use super::*;
728
729 #[test]
730 // Verify that the size of RowGroupDecoderState does not grow too large
731 fn test_structure_size() {
732 assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
733 }
734}