parquet/arrow/push_decoder/reader_builder/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod data;
19mod filter;
20
21use crate::DecodeResult;
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache};
24use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
25use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
26use crate::arrow::arrow_reader::{
27 ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy,
28};
29use crate::arrow::in_memory_row_group::ColumnChunkData;
30use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
31use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
32use crate::arrow::schema::ParquetField;
33use crate::errors::ParquetError;
34use crate::file::metadata::ParquetMetaData;
35use crate::file::page_index::offset_index::OffsetIndexMetaData;
36use crate::util::push_buffers::PushBuffers;
37use bytes::Bytes;
38use data::DataRequest;
39use filter::AdvanceResult;
40use filter::FilterInfo;
41use std::ops::Range;
42use std::sync::{Arc, RwLock};
43
44/// The current row group being read and the read plan
45#[derive(Debug)]
46struct RowGroupInfo {
47 row_group_idx: usize,
48 row_count: usize,
49 plan_builder: ReadPlanBuilder,
50}
51
52/// This is the inner state machine for reading a single row group.
53#[derive(Debug)]
54enum RowGroupDecoderState {
55 Start {
56 row_group_info: RowGroupInfo,
57 },
58 /// Planning filters, but haven't yet requested data to evaluate them
59 Filters {
60 row_group_info: RowGroupInfo,
61 /// Any previously read column chunk data from prior filters
62 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
63 filter_info: FilterInfo,
64 },
65 /// Needs data to evaluate current filter
66 WaitingOnFilterData {
67 row_group_info: RowGroupInfo,
68 filter_info: FilterInfo,
69 data_request: DataRequest,
70 },
71 /// Know what data to actually read, after all predicates
72 StartData {
73 row_group_info: RowGroupInfo,
74 /// Any previously read column chunk data from the filtering phase
75 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
76 /// Any cached filter results
77 cache_info: Option<CacheInfo>,
78 },
79 /// Needs data to proceed with reading the output
80 WaitingOnData {
81 row_group_info: RowGroupInfo,
82 data_request: DataRequest,
83 /// Any cached filter results
84 cache_info: Option<CacheInfo>,
85 },
86 /// Finished (or not yet started) reading this group
87 Finished,
88}
89
90/// Result of a state transition
91#[derive(Debug)]
92struct NextState {
93 next_state: RowGroupDecoderState,
94 /// result to return, if any
95 ///
96 /// * `Some`: the processing should stop and return the result
97 /// * `None`: processing should continue
98 result: Option<DecodeResult<ParquetRecordBatchReader>>,
99}
100
101impl NextState {
102 /// The next state with no result.
103 ///
104 /// This indicates processing should continue
105 fn again(next_state: RowGroupDecoderState) -> Self {
106 Self {
107 next_state,
108 result: None,
109 }
110 }
111
112 /// Create a NextState with a result that should be returned
113 fn result(
114 next_state: RowGroupDecoderState,
115 result: DecodeResult<ParquetRecordBatchReader>,
116 ) -> Self {
117 Self {
118 next_state,
119 result: Some(result),
120 }
121 }
122}
123
124/// Builder for [`ParquetRecordBatchReader`] for a single row group
125///
126/// This struct drives the main state machine for decoding each row group -- it
127/// determines what data is needed, and then assembles the
128/// `ParquetRecordBatchReader` when all data is available.
129#[derive(Debug)]
130pub(crate) struct RowGroupReaderBuilder {
131 /// The output batch size
132 batch_size: usize,
133
134 /// What columns to project (produce in each output batch)
135 projection: ProjectionMask,
136
137 /// The Parquet file metadata
138 metadata: Arc<ParquetMetaData>,
139
140 /// Top level parquet schema and arrow schema mapping
141 fields: Option<Arc<ParquetField>>,
142
143 /// Optional filter
144 filter: Option<RowFilter>,
145
146 /// Limit to apply to remaining row groups (decremented as rows are read)
147 limit: Option<usize>,
148
149 /// Offset to apply to remaining row groups (decremented as rows are read)
150 offset: Option<usize>,
151
152 /// The size in bytes of the predicate cache to use
153 ///
154 /// See [`RowGroupCache`] for details.
155 max_predicate_cache_size: usize,
156
157 /// The metrics collector
158 metrics: ArrowReaderMetrics,
159
160 /// Strategy for materialising row selections
161 row_selection_policy: RowSelectionPolicy,
162
163 /// Current state of the decoder.
164 ///
165 /// It is taken when processing, and must be put back before returning
166 /// it is a bug error if it is not put back after transitioning states.
167 state: Option<RowGroupDecoderState>,
168
169 /// The underlying data store
170 buffers: PushBuffers,
171}
172
173impl RowGroupReaderBuilder {
174 /// Create a new RowGroupReaderBuilder
175 #[expect(clippy::too_many_arguments)]
176 pub(crate) fn new(
177 batch_size: usize,
178 projection: ProjectionMask,
179 metadata: Arc<ParquetMetaData>,
180 fields: Option<Arc<ParquetField>>,
181 filter: Option<RowFilter>,
182 limit: Option<usize>,
183 offset: Option<usize>,
184 metrics: ArrowReaderMetrics,
185 max_predicate_cache_size: usize,
186 buffers: PushBuffers,
187 row_selection_policy: RowSelectionPolicy,
188 ) -> Self {
189 Self {
190 batch_size,
191 projection,
192 metadata,
193 fields,
194 filter,
195 limit,
196 offset,
197 metrics,
198 max_predicate_cache_size,
199 row_selection_policy,
200 state: Some(RowGroupDecoderState::Finished),
201 buffers,
202 }
203 }
204
205 /// Push new data buffers that can be used to satisfy pending requests
206 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
207 self.buffers.push_ranges(ranges, buffers);
208 }
209
210 /// Returns the total number of buffered bytes available
211 pub fn buffered_bytes(&self) -> u64 {
212 self.buffers.buffered_bytes()
213 }
214
215 /// Clear any staged ranges currently buffered for future decode work.
216 pub fn clear_all_ranges(&mut self) {
217 self.buffers.clear_all_ranges();
218 }
219
220 /// take the current state, leaving None in its place.
221 ///
222 /// Returns an error if there the state wasn't put back after the previous
223 /// call to [`Self::take_state`].
224 ///
225 /// Any code that calls this method must ensure that the state is put back
226 /// before returning, otherwise the reader will error next time it is called
227 fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
228 self.state.take().ok_or_else(|| {
229 ParquetError::General(String::from(
230 "Internal Error: RowGroupReader in invalid state",
231 ))
232 })
233 }
234
235 /// Setup this reader to read the next row group
236 pub(crate) fn next_row_group(
237 &mut self,
238 row_group_idx: usize,
239 row_count: usize,
240 selection: Option<RowSelection>,
241 ) -> Result<(), ParquetError> {
242 let state = self.take_state()?;
243 if !matches!(state, RowGroupDecoderState::Finished) {
244 return Err(ParquetError::General(format!(
245 "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}"
246 )));
247 }
248 let plan_builder = ReadPlanBuilder::new(self.batch_size)
249 .with_selection(selection)
250 .with_row_selection_policy(self.row_selection_policy);
251
252 let row_group_info = RowGroupInfo {
253 row_group_idx,
254 row_count,
255 plan_builder,
256 };
257
258 self.state = Some(RowGroupDecoderState::Start { row_group_info });
259 Ok(())
260 }
261
262 /// Try to build the next `ParquetRecordBatchReader` from this RowGroupReader.
263 ///
264 /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
265 /// ranges of data that are needed to proceed.
266 ///
267 /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
268 /// `DecodeResult::Data`.
269 pub(crate) fn try_build(
270 &mut self,
271 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
272 loop {
273 let current_state = self.take_state()?;
274 // Try to transition the decoder.
275 match self.try_transition(current_state)? {
276 // Either produced a batch reader, needed input, or finished
277 NextState {
278 next_state,
279 result: Some(result),
280 } => {
281 // put back the next state
282 self.state = Some(next_state);
283 return Ok(result);
284 }
285 // completed one internal state, maybe can proceed further
286 NextState {
287 next_state,
288 result: None,
289 } => {
290 // continue processing
291 self.state = Some(next_state);
292 }
293 }
294 }
295 }
296
297 /// Current state --> next state + optional output
298 ///
299 /// This is the main state transition function for the row group reader
300 /// and encodes the row group decoding state machine.
301 ///
302 /// # Notes
303 ///
304 /// This structure is used to reduce the indentation level of the main loop
305 /// in try_build
306 fn try_transition(
307 &mut self,
308 current_state: RowGroupDecoderState,
309 ) -> Result<NextState, ParquetError> {
310 let result = match current_state {
311 RowGroupDecoderState::Start { row_group_info } => {
312 let column_chunks = None; // no prior column chunks
313
314 let Some(filter) = self.filter.take() else {
315 // no filter, start trying to read data immediately
316 return Ok(NextState::again(RowGroupDecoderState::StartData {
317 row_group_info,
318 column_chunks,
319 cache_info: None,
320 }));
321 };
322 // no predicates in filter, so start reading immediately
323 if filter.predicates.is_empty() {
324 return Ok(NextState::again(RowGroupDecoderState::StartData {
325 row_group_info,
326 column_chunks,
327 cache_info: None,
328 }));
329 };
330
331 // we have predicates to evaluate
332 let cache_projection =
333 self.compute_cache_projection(row_group_info.row_group_idx, &filter);
334
335 let cache_info = CacheInfo::new(
336 cache_projection,
337 Arc::new(RwLock::new(RowGroupCache::new(
338 self.batch_size,
339 self.max_predicate_cache_size,
340 ))),
341 );
342
343 let filter_info = FilterInfo::new(filter, cache_info);
344 NextState::again(RowGroupDecoderState::Filters {
345 row_group_info,
346 filter_info,
347 column_chunks,
348 })
349 }
350 // need to evaluate filters
351 RowGroupDecoderState::Filters {
352 row_group_info,
353 column_chunks,
354 filter_info,
355 } => {
356 let RowGroupInfo {
357 row_group_idx,
358 row_count,
359 plan_builder,
360 } = row_group_info;
361
362 // If nothing is selected, we are done with this row group
363 if !plan_builder.selects_any() {
364 // ruled out entire row group
365 self.filter = Some(filter_info.into_filter());
366 return Ok(NextState::result(
367 RowGroupDecoderState::Finished,
368 DecodeResult::Finished,
369 ));
370 }
371
372 // Make a request for the data needed to evaluate the current predicate
373 let predicate = filter_info.current();
374
375 // need to fetch pages the column needs for decoding, figure
376 // that out based on the current selection and projection
377 let data_request = DataRequestBuilder::new(
378 row_group_idx,
379 row_count,
380 self.batch_size,
381 &self.metadata,
382 predicate.projection(), // use the predicate's projection
383 )
384 .with_selection(plan_builder.selection())
385 // Fetch predicate columns; expand selection only for cached predicate columns
386 .with_cache_projection(Some(filter_info.cache_projection()))
387 .with_column_chunks(column_chunks)
388 .build();
389
390 let row_group_info = RowGroupInfo {
391 row_group_idx,
392 row_count,
393 plan_builder,
394 };
395
396 NextState::again(RowGroupDecoderState::WaitingOnFilterData {
397 row_group_info,
398 filter_info,
399 data_request,
400 })
401 }
402 RowGroupDecoderState::WaitingOnFilterData {
403 row_group_info,
404 data_request,
405 mut filter_info,
406 } => {
407 // figure out what ranges we still need
408 let needed_ranges = data_request.needed_ranges(&self.buffers);
409 if !needed_ranges.is_empty() {
410 // still need data
411 return Ok(NextState::result(
412 RowGroupDecoderState::WaitingOnFilterData {
413 row_group_info,
414 filter_info,
415 data_request,
416 },
417 DecodeResult::NeedsData(needed_ranges),
418 ));
419 }
420
421 // otherwise we have all the data we need to evaluate the predicate
422 let RowGroupInfo {
423 row_group_idx,
424 row_count,
425 mut plan_builder,
426 } = row_group_info;
427
428 let predicate = filter_info.current();
429
430 let row_group = data_request.try_into_in_memory_row_group(
431 row_group_idx,
432 row_count,
433 &self.metadata,
434 predicate.projection(),
435 &mut self.buffers,
436 )?;
437
438 let cache_options = filter_info.cache_builder().producer();
439
440 let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
441 .with_cache_options(Some(&cache_options))
442 .with_parquet_metadata(&self.metadata)
443 .build_array_reader(self.fields.as_deref(), predicate.projection())?;
444
445 // Reset to original policy before each predicate so the override
446 // can detect page skipping for THIS predicate's columns.
447 // Without this reset, a prior predicate's override (e.g. Mask)
448 // carries forward and the check returns early, missing unfetched
449 // pages for subsequent predicates.
450 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
451
452 // Prepare to evaluate the filter.
453 // Note: first update the selection strategy to properly handle any pages
454 // pruned during fetch
455 plan_builder = override_selector_strategy_if_needed(
456 plan_builder,
457 predicate.projection(),
458 self.row_group_offset_index(row_group_idx),
459 );
460 // `with_predicate` actually evaluates the filter
461
462 plan_builder =
463 plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
464
465 let row_group_info = RowGroupInfo {
466 row_group_idx,
467 row_count,
468 plan_builder,
469 };
470
471 // Take back the column chunks that were read
472 let column_chunks = Some(row_group.column_chunks);
473
474 // advance to the next predicate, if any
475 match filter_info.advance() {
476 AdvanceResult::Continue(filter_info) => {
477 NextState::again(RowGroupDecoderState::Filters {
478 row_group_info,
479 column_chunks,
480 filter_info,
481 })
482 }
483 // done with predicates, proceed to reading data
484 AdvanceResult::Done(filter, cache_info) => {
485 // remember we need to put back the filter
486 assert!(self.filter.is_none());
487 self.filter = Some(filter);
488 NextState::again(RowGroupDecoderState::StartData {
489 row_group_info,
490 column_chunks,
491 cache_info: Some(cache_info),
492 })
493 }
494 }
495 }
496 RowGroupDecoderState::StartData {
497 row_group_info,
498 column_chunks,
499 cache_info,
500 } => {
501 let RowGroupInfo {
502 row_group_idx,
503 row_count,
504 plan_builder,
505 } = row_group_info;
506
507 // Compute the number of rows in the selection before applying limit and offset
508 let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
509
510 if rows_before == 0 {
511 // ruled out entire row group
512 return Ok(NextState::result(
513 RowGroupDecoderState::Finished,
514 DecodeResult::Finished,
515 ));
516 }
517
518 // Apply any limit and offset
519 let mut plan_builder = plan_builder
520 .limited(row_count)
521 .with_offset(self.offset)
522 .with_limit(self.limit)
523 .build_limited();
524
525 let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
526
527 // Update running offset and limit for after the current row group is read
528 if let Some(offset) = &mut self.offset {
529 // Reduction is either because of offset or limit, as limit is applied
530 // after offset has been "exhausted" can just use saturating sub here
531 *offset = offset.saturating_sub(rows_before - rows_after)
532 }
533
534 if rows_after == 0 {
535 // no rows left after applying limit/offset
536 return Ok(NextState::result(
537 RowGroupDecoderState::Finished,
538 DecodeResult::Finished,
539 ));
540 }
541
542 if let Some(limit) = &mut self.limit {
543 *limit -= rows_after;
544 }
545
546 let data_request = DataRequestBuilder::new(
547 row_group_idx,
548 row_count,
549 self.batch_size,
550 &self.metadata,
551 &self.projection,
552 )
553 .with_selection(plan_builder.selection())
554 .with_column_chunks(column_chunks)
555 // Final projection fetch shouldn't expand selection for cache
556 // so don't call with_cache_projection here
557 .build();
558
559 plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);
560
561 plan_builder = override_selector_strategy_if_needed(
562 plan_builder,
563 &self.projection,
564 self.row_group_offset_index(row_group_idx),
565 );
566
567 let row_group_info = RowGroupInfo {
568 row_group_idx,
569 row_count,
570 plan_builder,
571 };
572
573 NextState::again(RowGroupDecoderState::WaitingOnData {
574 row_group_info,
575 data_request,
576 cache_info,
577 })
578 }
579 // Waiting on data to proceed with reading the output
580 RowGroupDecoderState::WaitingOnData {
581 row_group_info,
582 data_request,
583 cache_info,
584 } => {
585 let needed_ranges = data_request.needed_ranges(&self.buffers);
586 if !needed_ranges.is_empty() {
587 // still need data
588 return Ok(NextState::result(
589 RowGroupDecoderState::WaitingOnData {
590 row_group_info,
591 data_request,
592 cache_info,
593 },
594 DecodeResult::NeedsData(needed_ranges),
595 ));
596 }
597
598 // otherwise we have all the data we need to proceed
599 let RowGroupInfo {
600 row_group_idx,
601 row_count,
602 plan_builder,
603 } = row_group_info;
604
605 let row_group = data_request.try_into_in_memory_row_group(
606 row_group_idx,
607 row_count,
608 &self.metadata,
609 &self.projection,
610 &mut self.buffers,
611 )?;
612
613 let plan = plan_builder.build();
614
615 // if we have any cached results, connect them up
616 let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics)
617 .with_parquet_metadata(&self.metadata);
618 let array_reader = if let Some(cache_info) = cache_info.as_ref() {
619 let cache_options: CacheOptions = cache_info.builder().consumer();
620 array_reader_builder
621 .with_cache_options(Some(&cache_options))
622 .build_array_reader(self.fields.as_deref(), &self.projection)
623 } else {
624 array_reader_builder
625 .build_array_reader(self.fields.as_deref(), &self.projection)
626 }?;
627
628 let reader = ParquetRecordBatchReader::new(array_reader, plan);
629 NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
630 }
631 RowGroupDecoderState::Finished => {
632 // nothing left to read
633 NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished)
634 }
635 };
636 Ok(result)
637 }
638
639 /// Which columns should be cached?
640 ///
641 /// Returns the columns that are used by the filters *and* then used in the
642 /// final projection, excluding any nested columns.
643 fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask {
644 let meta = self.metadata.row_group(row_group_idx);
645 match self.compute_cache_projection_inner(filter) {
646 Some(projection) => projection,
647 None => ProjectionMask::none(meta.columns().len()),
648 }
649 }
650
651 fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option<ProjectionMask> {
652 // Do not compute the projection mask if the predicate cache is disabled
653 if self.max_predicate_cache_size == 0 {
654 return None;
655 }
656 let mut cache_projection = filter.predicates.first()?.projection().clone();
657 for predicate in filter.predicates.iter() {
658 cache_projection.union(predicate.projection());
659 }
660 cache_projection.intersect(&self.projection);
661 self.exclude_nested_columns_from_cache(&cache_projection)
662 }
663
664 /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns)
665 fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> {
666 mask.without_nested_types(self.metadata.file_metadata().schema_descr())
667 }
668
669 /// Get the offset index for the specified row group, if any
670 fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> {
671 self.metadata
672 .offset_index()
673 .filter(|index| !index.is_empty())
674 .and_then(|index| index.get(row_group_idx))
675 .map(|columns| columns.as_slice())
676 }
677}
678
679/// Override the selection strategy if needed.
680///
681/// Some pages can be skipped during row-group construction if they are not read
682/// by the selections. This means that the data pages for those rows are never
683/// loaded and definition/repetition levels are never read. When using
684/// `RowSelections` selection works because `skip_records()` handles this
685/// case and skips the page accordingly.
686///
687/// However, with the current mask design, all values must be read and decoded
688/// and then a mask filter is applied. Thus if any pages are skipped during
689/// row-group construction, the data pages are missing and cannot be decoded.
690///
691/// A simple example:
692/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
693/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01)
694///
695/// Using the row selection to skip(4), page2 won't be read at all, so in this
696/// case we can't decode all the rows and apply a mask. To correctly apply the
697/// bit mask, we need all 6 values be read, but page2 is not in memory.
698fn override_selector_strategy_if_needed(
699 plan_builder: ReadPlanBuilder,
700 projection_mask: &ProjectionMask,
701 offset_index: Option<&[OffsetIndexMetaData]>,
702) -> ReadPlanBuilder {
703 // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that
704 let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
705 return plan_builder;
706 };
707
708 let preferred_strategy = plan_builder.resolve_selection_strategy();
709
710 let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
711 && plan_builder.selection().is_some_and(|selection| {
712 selection.should_force_selectors(projection_mask, offset_index)
713 });
714
715 let resolved_strategy = if force_selectors {
716 RowSelectionStrategy::Selectors
717 } else {
718 preferred_strategy
719 };
720
721 // override the plan builder strategy with the resolved one
722 let new_policy = match resolved_strategy {
723 RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
724 RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
725 };
726
727 plan_builder.with_row_selection_policy(new_policy)
728}
729
730#[cfg(test)]
731mod tests {
732 use super::*;
733
734 #[test]
735 // Verify that the size of RowGroupDecoderState does not grow too large
736 fn test_structure_size() {
737 assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
738 }
739}