parquet/arrow/push_decoder/reader_builder/
data.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataRequest`] tracks and holds data needed to construct InMemoryRowGroups
19
20use crate::arrow::ProjectionMask;
21use crate::arrow::arrow_reader::RowSelection;
22use crate::arrow::in_memory_row_group::{ColumnChunkData, FetchRanges, InMemoryRowGroup};
23use crate::errors::ParquetError;
24use crate::file::metadata::ParquetMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use crate::file::reader::ChunkReader;
27use crate::util::push_buffers::PushBuffers;
28use bytes::Bytes;
29use std::ops::Range;
30use std::sync::Arc;
31
32/// Contains in-progress state to construct InMemoryRowGroups
33///
34/// See [`DataRequestBuilder`] for creating new requests
35#[derive(Debug)]
36pub(super) struct DataRequest {
37    /// Any previously read column chunk data
38    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
39    /// The ranges of data that are needed next
40    ranges: Vec<Range<u64>>,
41    /// Optional page start offsets for each requested range. This is used
42    /// to create the relevant InMemoryRowGroup
43    page_start_offsets: Option<Vec<Vec<u64>>>,
44}
45
46impl DataRequest {
47    /// return what ranges are still needed to satisfy this request. Returns an empty vec
48    /// if all ranges are satisfied
49    pub fn needed_ranges(&self, buffers: &PushBuffers) -> Vec<Range<u64>> {
50        self.ranges
51            .iter()
52            .filter(|&range| !buffers.has_range(range))
53            .cloned()
54            .collect()
55    }
56
57    /// Returns the chunks from the buffers that satisfy this request
58    fn get_chunks(&self, buffers: &PushBuffers) -> Result<Vec<Bytes>, ParquetError> {
59        self.ranges
60            .iter()
61            .map(|range| {
62                let length: usize = (range.end - range.start)
63                    .try_into()
64                    .expect("overflow for offset");
65                // should have all the data due to the check above
66                buffers.get_bytes(range.start, length).map_err(|e| {
67                    ParquetError::General(format!(
68                        "Internal Error missing data for range {range:?} in buffers: {e}",
69                    ))
70                })
71            })
72            .collect()
73    }
74
75    /// Create a new InMemoryRowGroup, and fill it with provided data
76    ///
77    /// Assumes that all needed data is present in the buffers
78    /// and clears any explicitly requested ranges
79    pub fn try_into_in_memory_row_group<'a>(
80        self,
81        row_group_idx: usize,
82        row_count: usize,
83        parquet_metadata: &'a ParquetMetaData,
84        projection: &ProjectionMask,
85        buffers: &mut PushBuffers,
86    ) -> Result<InMemoryRowGroup<'a>, ParquetError> {
87        let chunks = self.get_chunks(buffers)?;
88
89        let Self {
90            column_chunks,
91            ranges,
92            page_start_offsets,
93        } = self;
94
95        // Create an InMemoryRowGroup to hold the column chunks, this is a
96        // temporary structure used to tell the ArrowReaders what pages are
97        // needed for decoding
98        let mut in_memory_row_group = InMemoryRowGroup {
99            row_count,
100            column_chunks,
101            offset_index: get_offset_index(parquet_metadata, row_group_idx),
102            row_group_idx,
103            metadata: parquet_metadata,
104        };
105
106        in_memory_row_group.fill_column_chunks(projection, page_start_offsets, chunks);
107
108        // Clear the ranges that were explicitly requested
109        buffers.clear_ranges(&ranges);
110
111        Ok(in_memory_row_group)
112    }
113}
114
115/// Builder for [`DataRequest`]
116pub(super) struct DataRequestBuilder<'a> {
117    /// The row group index
118    row_group_idx: usize,
119    /// The number of rows in the row group
120    row_count: usize,
121    /// The batch size to read
122    batch_size: usize,
123    /// The parquet metadata
124    parquet_metadata: &'a ParquetMetaData,
125    /// The projection mask (which columns to read)
126    projection: &'a ProjectionMask,
127    /// Optional row selection to apply
128    selection: Option<&'a RowSelection>,
129    /// Optional projection mask if using
130    /// [`RowGroupCache`](crate::arrow::array_reader::RowGroupCache)
131    /// for caching decoded columns.
132    cache_projection: Option<&'a ProjectionMask>,
133    /// Any previously read column chunks
134    column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
135}
136
137impl<'a> DataRequestBuilder<'a> {
138    pub(super) fn new(
139        row_group_idx: usize,
140        row_count: usize,
141        batch_size: usize,
142        parquet_metadata: &'a ParquetMetaData,
143        projection: &'a ProjectionMask,
144    ) -> Self {
145        Self {
146            row_group_idx,
147            row_count,
148            batch_size,
149            parquet_metadata,
150            projection,
151            selection: None,
152            cache_projection: None,
153            column_chunks: None,
154        }
155    }
156
157    /// Set an optional row selection to apply
158    pub(super) fn with_selection(mut self, selection: Option<&'a RowSelection>) -> Self {
159        self.selection = selection;
160        self
161    }
162
163    /// set columns to cache, if any
164    pub(super) fn with_cache_projection(
165        mut self,
166        cache_projection: Option<&'a ProjectionMask>,
167    ) -> Self {
168        self.cache_projection = cache_projection;
169        self
170    }
171
172    /// Provide any previously read column chunks
173    pub(super) fn with_column_chunks(
174        mut self,
175        column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
176    ) -> Self {
177        self.column_chunks = column_chunks;
178        self
179    }
180
181    pub(crate) fn build(self) -> DataRequest {
182        let Self {
183            row_group_idx,
184            row_count,
185            batch_size,
186            parquet_metadata,
187            projection,
188            selection,
189            cache_projection,
190            column_chunks,
191        } = self;
192
193        let row_group_meta_data = parquet_metadata.row_group(row_group_idx);
194
195        // If no previously read column chunks are provided, create a new location to hold them
196        let column_chunks =
197            column_chunks.unwrap_or_else(|| vec![None; row_group_meta_data.columns().len()]);
198
199        // Create an InMemoryRowGroup to hold the column chunks, this is a
200        // temporary structure used to tell the ArrowReaders what pages are
201        // needed for decoding
202        let row_group = InMemoryRowGroup {
203            row_count,
204            column_chunks,
205            offset_index: get_offset_index(parquet_metadata, row_group_idx),
206            row_group_idx,
207            metadata: parquet_metadata,
208        };
209
210        let FetchRanges {
211            ranges,
212            page_start_offsets,
213        } = row_group.fetch_ranges(projection, selection, batch_size, cache_projection);
214
215        DataRequest {
216            // Save any previously read column chunks
217            column_chunks: row_group.column_chunks,
218            ranges,
219            page_start_offsets,
220        }
221    }
222}
223
224fn get_offset_index(
225    parquet_metadata: &ParquetMetaData,
226    row_group_idx: usize,
227) -> Option<&[OffsetIndexMetaData]> {
228    parquet_metadata
229        .offset_index()
230        // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
231        .filter(|index| !index.is_empty())
232        .map(|x| x[row_group_idx].as_slice())
233}