Skip to main content

parquet/arrow/push_decoder/
remaining.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::DecodeResult;
19use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
20use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
21use crate::errors::ParquetError;
22use crate::file::metadata::ParquetMetaData;
23use bytes::Bytes;
24use std::collections::VecDeque;
25use std::ops::Range;
26use std::sync::Arc;
27
28/// State machine that tracks the remaining high level chunks (row groups) of
29/// Parquet data are left to read.
30///
31/// This is currently a row group, but the author aspires to extend the pattern
32/// to data boundaries other than RowGroups in the future.
33#[derive(Debug)]
34pub(crate) struct RemainingRowGroups {
35    /// The underlying Parquet metadata
36    parquet_metadata: Arc<ParquetMetaData>,
37
38    /// The row groups that have not yet been read
39    row_groups: VecDeque<usize>,
40
41    /// Remaining selection to apply to the next row groups
42    selection: Option<RowSelection>,
43
44    /// State for building the reader for the current row group
45    row_group_reader_builder: RowGroupReaderBuilder,
46}
47
48impl RemainingRowGroups {
49    pub fn new(
50        parquet_metadata: Arc<ParquetMetaData>,
51        row_groups: Vec<usize>,
52        selection: Option<RowSelection>,
53        row_group_reader_builder: RowGroupReaderBuilder,
54    ) -> Self {
55        Self {
56            parquet_metadata,
57            row_groups: VecDeque::from(row_groups),
58            selection,
59            row_group_reader_builder,
60        }
61    }
62
63    /// Push new data buffers that can be used to satisfy pending requests
64    pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
65        self.row_group_reader_builder.push_data(ranges, buffers);
66    }
67
68    /// Return the total number of bytes buffered so far
69    pub fn buffered_bytes(&self) -> u64 {
70        self.row_group_reader_builder.buffered_bytes()
71    }
72
73    /// Clear any staged ranges currently buffered for future decode work
74    pub fn clear_all_ranges(&mut self) {
75        self.row_group_reader_builder.clear_all_ranges();
76    }
77
78    /// returns [`ParquetRecordBatchReader`] suitable for reading the next
79    /// group of rows from the Parquet data, or the list of data ranges still
80    /// needed to proceed
81    pub fn try_next_reader(
82        &mut self,
83    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
84        loop {
85            // Are we ready yet to start reading?
86            let result: DecodeResult<ParquetRecordBatchReader> =
87                self.row_group_reader_builder.try_build()?;
88            match result {
89                DecodeResult::Finished => {
90                    // reader is done, proceed to the next row group
91                    // fall through to the next row group
92                    // This happens if the row group was completely filtered out
93                }
94                DecodeResult::NeedsData(ranges) => {
95                    // need more data to proceed
96                    return Ok(DecodeResult::NeedsData(ranges));
97                }
98                DecodeResult::Data(batch_reader) => {
99                    // ready to read the row group
100                    return Ok(DecodeResult::Data(batch_reader));
101                }
102            }
103
104            // No current reader, proceed to the next row group if any
105            let row_group_idx = match self.row_groups.pop_front() {
106                None => return Ok(DecodeResult::Finished),
107                Some(idx) => idx,
108            };
109
110            let row_count: usize = self
111                .parquet_metadata
112                .row_group(row_group_idx)
113                .num_rows()
114                .try_into()
115                .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))?;
116
117            let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
118            self.row_group_reader_builder
119                .next_row_group(row_group_idx, row_count, selection)?;
120            // the next iteration will try to build the reader for the new row group
121        }
122    }
123}