parquet/arrow/push_decoder/reader_builder/
data.rs1use crate::arrow::ProjectionMask;
21use crate::arrow::arrow_reader::RowSelection;
22use crate::arrow::in_memory_row_group::{ColumnChunkData, FetchRanges, InMemoryRowGroup};
23use crate::errors::ParquetError;
24use crate::file::metadata::ParquetMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use crate::file::reader::ChunkReader;
27use crate::util::push_buffers::PushBuffers;
28use bytes::Bytes;
29use std::ops::Range;
30use std::sync::Arc;
31
32#[derive(Debug)]
36pub(super) struct DataRequest {
37 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
39 ranges: Vec<Range<u64>>,
41 page_start_offsets: Option<Vec<Vec<u64>>>,
44}
45
46impl DataRequest {
47 pub fn needed_ranges(&self, buffers: &PushBuffers) -> Vec<Range<u64>> {
50 self.ranges
51 .iter()
52 .filter(|&range| !buffers.has_range(range))
53 .cloned()
54 .collect()
55 }
56
57 fn get_chunks(&self, buffers: &PushBuffers) -> Result<Vec<Bytes>, ParquetError> {
59 self.ranges
60 .iter()
61 .map(|range| {
62 let length: usize = (range.end - range.start)
63 .try_into()
64 .expect("overflow for offset");
65 buffers.get_bytes(range.start, length).map_err(|e| {
67 ParquetError::General(format!(
68 "Internal Error missing data for range {range:?} in buffers: {e}",
69 ))
70 })
71 })
72 .collect()
73 }
74
75 pub fn try_into_in_memory_row_group<'a>(
80 self,
81 row_group_idx: usize,
82 row_count: usize,
83 parquet_metadata: &'a ParquetMetaData,
84 projection: &ProjectionMask,
85 buffers: &mut PushBuffers,
86 ) -> Result<InMemoryRowGroup<'a>, ParquetError> {
87 let chunks = self.get_chunks(buffers)?;
88
89 let Self {
90 column_chunks,
91 ranges,
92 page_start_offsets,
93 } = self;
94
95 let mut in_memory_row_group = InMemoryRowGroup {
99 row_count,
100 column_chunks,
101 offset_index: get_offset_index(parquet_metadata, row_group_idx),
102 row_group_idx,
103 metadata: parquet_metadata,
104 };
105
106 in_memory_row_group.fill_column_chunks(projection, page_start_offsets, chunks);
107
108 buffers.clear_ranges(&ranges);
110
111 Ok(in_memory_row_group)
112 }
113}
114
115pub(super) struct DataRequestBuilder<'a> {
117 row_group_idx: usize,
119 row_count: usize,
121 batch_size: usize,
123 parquet_metadata: &'a ParquetMetaData,
125 projection: &'a ProjectionMask,
127 selection: Option<&'a RowSelection>,
129 cache_projection: Option<&'a ProjectionMask>,
133 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
135}
136
137impl<'a> DataRequestBuilder<'a> {
138 pub(super) fn new(
139 row_group_idx: usize,
140 row_count: usize,
141 batch_size: usize,
142 parquet_metadata: &'a ParquetMetaData,
143 projection: &'a ProjectionMask,
144 ) -> Self {
145 Self {
146 row_group_idx,
147 row_count,
148 batch_size,
149 parquet_metadata,
150 projection,
151 selection: None,
152 cache_projection: None,
153 column_chunks: None,
154 }
155 }
156
157 pub(super) fn with_selection(mut self, selection: Option<&'a RowSelection>) -> Self {
159 self.selection = selection;
160 self
161 }
162
163 pub(super) fn with_cache_projection(
165 mut self,
166 cache_projection: Option<&'a ProjectionMask>,
167 ) -> Self {
168 self.cache_projection = cache_projection;
169 self
170 }
171
172 pub(super) fn with_column_chunks(
174 mut self,
175 column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
176 ) -> Self {
177 self.column_chunks = column_chunks;
178 self
179 }
180
181 pub(crate) fn build(self) -> DataRequest {
182 let Self {
183 row_group_idx,
184 row_count,
185 batch_size,
186 parquet_metadata,
187 projection,
188 selection,
189 cache_projection,
190 column_chunks,
191 } = self;
192
193 let row_group_meta_data = parquet_metadata.row_group(row_group_idx);
194
195 let column_chunks =
197 column_chunks.unwrap_or_else(|| vec![None; row_group_meta_data.columns().len()]);
198
199 let row_group = InMemoryRowGroup {
203 row_count,
204 column_chunks,
205 offset_index: get_offset_index(parquet_metadata, row_group_idx),
206 row_group_idx,
207 metadata: parquet_metadata,
208 };
209
210 let FetchRanges {
211 ranges,
212 page_start_offsets,
213 } = row_group.fetch_ranges(projection, selection, batch_size, cache_projection);
214
215 DataRequest {
216 column_chunks: row_group.column_chunks,
218 ranges,
219 page_start_offsets,
220 }
221 }
222}
223
224fn get_offset_index(
225 parquet_metadata: &ParquetMetaData,
226 row_group_idx: usize,
227) -> Option<&[OffsetIndexMetaData]> {
228 parquet_metadata
229 .offset_index()
230 .filter(|index| !index.is_empty())
232 .map(|x| x[row_group_idx].as_slice())
233}