parquet/arrow/push_decoder/remaining.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
21use crate::errors::ParquetError;
22use crate::file::metadata::ParquetMetaData;
23use bytes::Bytes;
24use std::collections::VecDeque;
25use std::ops::Range;
26use std::sync::Arc;
27
28/// State machine that tracks the remaining high level chunks (row groups) of
29/// Parquet data are left to read.
30///
31/// This is currently a row group, but the author aspires to extend the pattern
32/// to data boundaries other than RowGroups in the future.
33#[derive(Debug)]
34pub(crate) struct RemainingRowGroups {
35 /// The underlying Parquet metadata
36 parquet_metadata: Arc<ParquetMetaData>,
37
38 /// The row groups that have not yet been read
39 row_groups: VecDeque<usize>,
40
41 /// Remaining selection to apply to the next row groups
42 selection: Option<RowSelection>,
43
44 /// State for building the reader for the current row group
45 row_group_reader_builder: RowGroupReaderBuilder,
46}
47
48impl RemainingRowGroups {
49 pub fn new(
50 parquet_metadata: Arc<ParquetMetaData>,
51 row_groups: Vec<usize>,
52 selection: Option<RowSelection>,
53 row_group_reader_builder: RowGroupReaderBuilder,
54 ) -> Self {
55 Self {
56 parquet_metadata,
57 row_groups: VecDeque::from(row_groups),
58 selection,
59 row_group_reader_builder,
60 }
61 }
62
63 /// Push new data buffers that can be used to satisfy pending requests
64 pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
65 self.row_group_reader_builder.push_data(ranges, buffers);
66 }
67
68 /// Return the total number of bytes buffered so far
69 pub fn buffered_bytes(&self) -> u64 {
70 self.row_group_reader_builder.buffered_bytes()
71 }
72
73 /// returns [`ParquetRecordBatchReader`] suitable for reading the next
74 /// group of rows from the Parquet data, or the list of data ranges still
75 /// needed to proceed
76 pub fn try_next_reader(
77 &mut self,
78 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
79 loop {
80 // Are we ready yet to start reading?
81 let result: DecodeResult<ParquetRecordBatchReader> =
82 self.row_group_reader_builder.try_build()?;
83 match result {
84 DecodeResult::Finished => {
85 // reader is done, proceed to the next row group
86 // fall through to the next row group
87 // This happens if the row group was completely filtered out
88 }
89 DecodeResult::NeedsData(ranges) => {
90 // need more data to proceed
91 return Ok(DecodeResult::NeedsData(ranges));
92 }
93 DecodeResult::Data(batch_reader) => {
94 // ready to read the row group
95 return Ok(DecodeResult::Data(batch_reader));
96 }
97 }
98
99 // No current reader, proceed to the next row group if any
100 let row_group_idx = match self.row_groups.pop_front() {
101 None => return Ok(DecodeResult::Finished),
102 Some(idx) => idx,
103 };
104
105 let row_count: usize = self
106 .parquet_metadata
107 .row_group(row_group_idx)
108 .num_rows()
109 .try_into()
110 .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))?;
111
112 let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
113 self.row_group_reader_builder
114 .next_row_group(row_group_idx, row_count, selection)?;
115 // the next iteration will try to build the reader for the new row group
116 }
117 }
118}