parquet/arrow/push_decoder/
remaining.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
21use crate::errors::ParquetError;
22use crate::file::metadata::ParquetMetaData;
23use bytes::Bytes;
24use std::collections::VecDeque;
25use std::ops::Range;
26use std::sync::Arc;
27
28/// State machine that tracks the remaining high level chunks (row groups) of
29/// Parquet data are left to read.
30///
31/// This is currently a row group, but the author aspires to extend the pattern
32/// to data boundaries other than RowGroups in the future.
33#[derive(Debug)]
34pub(crate) struct RemainingRowGroups {
35    /// The underlying Parquet metadata
36    parquet_metadata: Arc<ParquetMetaData>,
37
38    /// The row groups that have not yet been read
39    row_groups: VecDeque<usize>,
40
41    /// Remaining selection to apply to the next row groups
42    selection: Option<RowSelection>,
43
44    /// State for building the reader for the current row group
45    row_group_reader_builder: RowGroupReaderBuilder,
46}
47
48impl RemainingRowGroups {
49    pub fn new(
50        parquet_metadata: Arc<ParquetMetaData>,
51        row_groups: Vec<usize>,
52        selection: Option<RowSelection>,
53        row_group_reader_builder: RowGroupReaderBuilder,
54    ) -> Self {
55        Self {
56            parquet_metadata,
57            row_groups: VecDeque::from(row_groups),
58            selection,
59            row_group_reader_builder,
60        }
61    }
62
63    /// Push new data buffers that can be used to satisfy pending requests
64    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
65        self.row_group_reader_builder.push_data(ranges, buffers);
66    }
67
68    /// Return the total number of bytes buffered so far
69    pub fn buffered_bytes(&self) -> u64 {
70        self.row_group_reader_builder.buffered_bytes()
71    }
72
73    /// returns [`ParquetRecordBatchReader`] suitable for reading the next
74    /// group of rows from the Parquet data, or the list of data ranges still
75    /// needed to proceed
76    pub fn try_next_reader(
77        &mut self,
78    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
79        loop {
80            // Are we ready yet to start reading?
81            let result: DecodeResult<ParquetRecordBatchReader> =
82                self.row_group_reader_builder.try_build()?;
83            match result {
84                DecodeResult::Finished => {
85                    // reader is done, proceed to the next row group
86                    // fall through to the next row group
87                    // This happens if the row group was completely filtered out
88                }
89                DecodeResult::NeedsData(ranges) => {
90                    // need more data to proceed
91                    return Ok(DecodeResult::NeedsData(ranges));
92                }
93                DecodeResult::Data(batch_reader) => {
94                    // ready to read the row group
95                    return Ok(DecodeResult::Data(batch_reader));
96                }
97            }
98
99            // No current reader, proceed to the next row group if any
100            let row_group_idx = match self.row_groups.pop_front() {
101                None => return Ok(DecodeResult::Finished),
102                Some(idx) => idx,
103            };
104
105            let row_count: usize = self
106                .parquet_metadata
107                .row_group(row_group_idx)
108                .num_rows()
109                .try_into()
110                .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))?;
111
112            let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
113            self.row_group_reader_builder
114                .next_row_group(row_group_idx, row_count, selection)?;
115            // the next iteration will try to build the reader for the new row group
116        }
117    }
118}