parquet/arrow/push_decoder/remaining.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
21use crate::errors::ParquetError;
22use crate::file::metadata::ParquetMetaData;
23use bytes::Bytes;
24use std::collections::VecDeque;
25use std::ops::Range;
26use std::sync::Arc;
27
28/// State machine that tracks the remaining high level chunks (row groups) of
29/// Parquet data are left to read.
30///
31/// This is currently a row group, but the author aspires to extend the pattern
32/// to data boundaries other than RowGroups in the future.
33#[derive(Debug)]
34pub(crate) struct RemainingRowGroups {
35 /// The underlying Parquet metadata
36 parquet_metadata: Arc<ParquetMetaData>,
37
38 /// The row groups that have not yet been read
39 row_groups: VecDeque<usize>,
40
41 /// Remaining selection to apply to the next row groups
42 selection: Option<RowSelection>,
43
44 /// State for building the reader for the current row group
45 row_group_reader_builder: RowGroupReaderBuilder,
46}
47
48impl RemainingRowGroups {
49 pub fn new(
50 parquet_metadata: Arc<ParquetMetaData>,
51 row_groups: Vec<usize>,
52 selection: Option<RowSelection>,
53 row_group_reader_builder: RowGroupReaderBuilder,
54 ) -> Self {
55 Self {
56 parquet_metadata,
57 row_groups: VecDeque::from(row_groups),
58 selection,
59 row_group_reader_builder,
60 }
61 }
62
63 /// Push new data buffers that can be used to satisfy pending requests
64 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
65 self.row_group_reader_builder.push_data(ranges, buffers);
66 }
67
68 /// Return the total number of bytes buffered so far
69 pub fn buffered_bytes(&self) -> u64 {
70 self.row_group_reader_builder.buffered_bytes()
71 }
72
73 /// Clear any staged ranges currently buffered for future decode work
74 pub fn clear_all_ranges(&mut self) {
75 self.row_group_reader_builder.clear_all_ranges();
76 }
77
78 /// returns [`ParquetRecordBatchReader`] suitable for reading the next
79 /// group of rows from the Parquet data, or the list of data ranges still
80 /// needed to proceed
81 pub fn try_next_reader(
82 &mut self,
83 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
84 loop {
85 // Are we ready yet to start reading?
86 let result: DecodeResult<ParquetRecordBatchReader> =
87 self.row_group_reader_builder.try_build()?;
88 match result {
89 DecodeResult::Finished => {
90 // reader is done, proceed to the next row group
91 // fall through to the next row group
92 // This happens if the row group was completely filtered out
93 }
94 DecodeResult::NeedsData(ranges) => {
95 // need more data to proceed
96 return Ok(DecodeResult::NeedsData(ranges));
97 }
98 DecodeResult::Data(batch_reader) => {
99 // ready to read the row group
100 return Ok(DecodeResult::Data(batch_reader));
101 }
102 }
103
104 // No current reader, proceed to the next row group if any
105 let row_group_idx = match self.row_groups.pop_front() {
106 None => return Ok(DecodeResult::Finished),
107 Some(idx) => idx,
108 };
109
110 let row_count: usize = self
111 .parquet_metadata
112 .row_group(row_group_idx)
113 .num_rows()
114 .try_into()
115 .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))?;
116
117 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
118 self.row_group_reader_builder
119 .next_row_group(row_group_idx, row_count, selection)?;
120 // the next iteration will try to build the reader for the new row group
121 }
122 }
123}