parquet/arrow/
in_memory_row_group.rs1use crate::arrow::ProjectionMask;
19use crate::arrow::array_reader::RowGroups;
20use crate::arrow::arrow_reader::RowSelection;
21use crate::column::page::{PageIterator, PageReader};
22use crate::errors::ParquetError;
23use crate::file::metadata::ParquetMetaData;
24use crate::file::page_index::offset_index::OffsetIndexMetaData;
25use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
26use bytes::{Buf, Bytes};
27use std::ops::Range;
28use std::sync::Arc;
29
30#[derive(Debug)]
32pub(crate) struct InMemoryRowGroup<'a> {
33 pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
34 pub(crate) column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
36 pub(crate) row_count: usize,
37 pub(crate) row_group_idx: usize,
38 pub(crate) metadata: &'a ParquetMetaData,
39}
40
41#[derive(Debug)]
43pub(crate) struct FetchRanges {
44 pub(crate) ranges: Vec<Range<u64>>,
46 pub(crate) page_start_offsets: Option<Vec<Vec<u64>>>,
48}
49
50impl InMemoryRowGroup<'_> {
51 pub(crate) fn fetch_ranges(
59 &self,
60 projection: &ProjectionMask,
61 selection: Option<&RowSelection>,
62 batch_size: usize,
63 cache_mask: Option<&ProjectionMask>,
64 ) -> FetchRanges {
65 let metadata = self.metadata.row_group(self.row_group_idx);
66 if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
67 let expanded_selection =
68 selection.expand_to_batch_boundaries(batch_size, self.row_count);
69
70 let mut page_start_offsets: Vec<Vec<u64>> = vec![];
74
75 let ranges = self
76 .column_chunks
77 .iter()
78 .zip(metadata.columns())
79 .enumerate()
80 .filter(|&(idx, (chunk, _chunk_meta))| {
81 chunk.is_none() && projection.leaf_included(idx)
82 })
83 .flat_map(|(idx, (_chunk, chunk_meta))| {
84 let mut ranges: Vec<Range<u64>> = vec![];
87 let (start, _len) = chunk_meta.byte_range();
88 match offset_index[idx].page_locations.first() {
89 Some(first) if first.offset as u64 != start => {
90 ranges.push(start..first.offset as u64);
91 }
92 _ => (),
93 }
94
95 let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
98 if use_expanded {
99 ranges.extend(
100 expanded_selection.scan_ranges(&offset_index[idx].page_locations),
101 );
102 } else {
103 ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
104 }
105 page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
106
107 ranges
108 })
109 .collect();
110 FetchRanges {
111 ranges,
112 page_start_offsets: Some(page_start_offsets),
113 }
114 } else {
115 let ranges = self
116 .column_chunks
117 .iter()
118 .enumerate()
119 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
120 .map(|(idx, _chunk)| {
121 let column = metadata.column(idx);
122 let (start, length) = column.byte_range();
123 start..(start + length)
124 })
125 .collect();
126 FetchRanges {
127 ranges,
128 page_start_offsets: None,
129 }
130 }
131 }
132
133 pub(crate) fn fill_column_chunks<I>(
138 &mut self,
139 projection: &ProjectionMask,
140 page_start_offsets: Option<Vec<Vec<u64>>>,
141 chunk_data: I,
142 ) where
143 I: IntoIterator<Item = Bytes>,
144 {
145 let mut chunk_data = chunk_data.into_iter();
146 let metadata = self.metadata.row_group(self.row_group_idx);
147 if let Some(page_start_offsets) = page_start_offsets {
148 let mut page_start_offsets = page_start_offsets.into_iter();
151
152 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
153 if chunk.is_some() || !projection.leaf_included(idx) {
154 continue;
155 }
156
157 if let Some(offsets) = page_start_offsets.next() {
158 let mut chunks = Vec::with_capacity(offsets.len());
159 for _ in 0..offsets.len() {
160 chunks.push(chunk_data.next().unwrap());
161 }
162
163 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
164 length: metadata.column(idx).byte_range().1 as usize,
165 data: offsets
166 .into_iter()
167 .map(|x| x as usize)
168 .zip(chunks.into_iter())
169 .collect(),
170 }))
171 }
172 }
173 } else {
174 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
175 if chunk.is_some() || !projection.leaf_included(idx) {
176 continue;
177 }
178
179 if let Some(data) = chunk_data.next() {
180 *chunk = Some(Arc::new(ColumnChunkData::Dense {
181 offset: metadata.column(idx).byte_range().0 as usize,
182 data,
183 }));
184 }
185 }
186 }
187 }
188}
189
190impl RowGroups for InMemoryRowGroup<'_> {
191 fn num_rows(&self) -> usize {
192 self.row_count
193 }
194
195 fn column_chunks(&self, i: usize) -> crate::errors::Result<Box<dyn PageIterator>> {
197 match &self.column_chunks[i] {
198 None => Err(ParquetError::General(format!(
199 "Invalid column index {i}, column was not fetched"
200 ))),
201 Some(data) => {
202 let page_locations = self
203 .offset_index
204 .filter(|index| !index.is_empty())
206 .map(|index| index[i].page_locations.clone());
207 let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
208 let page_reader = SerializedPageReader::new(
209 data.clone(),
210 column_chunk_metadata,
211 self.row_count,
212 page_locations,
213 )?;
214 let page_reader = page_reader.add_crypto_context(
215 self.row_group_idx,
216 i,
217 self.metadata,
218 column_chunk_metadata,
219 )?;
220
221 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
222
223 Ok(Box::new(ColumnChunkIterator {
224 reader: Some(Ok(page_reader)),
225 }))
226 }
227 }
228 }
229}
230
231#[derive(Clone, Debug)]
235pub(crate) enum ColumnChunkData {
236 Sparse {
240 length: usize,
242 data: Vec<(usize, Bytes)>,
247 },
248 Dense { offset: usize, data: Bytes },
250}
251
252impl ColumnChunkData {
253 fn get(&self, start: u64) -> crate::errors::Result<Bytes> {
255 match &self {
256 ColumnChunkData::Sparse { data, .. } => data
257 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
258 .map(|idx| data[idx].1.clone())
259 .map_err(|_| {
260 ParquetError::General(format!(
261 "Invalid offset in sparse column chunk data: {start}"
262 ))
263 }),
264 ColumnChunkData::Dense { offset, data } => {
265 let start = start as usize - *offset;
266 Ok(data.slice(start..))
267 }
268 }
269 }
270}
271
272impl Length for ColumnChunkData {
273 fn len(&self) -> u64 {
275 match &self {
276 ColumnChunkData::Sparse { length, .. } => *length as u64,
277 ColumnChunkData::Dense { data, .. } => data.len() as u64,
278 }
279 }
280}
281
282impl ChunkReader for ColumnChunkData {
283 type T = bytes::buf::Reader<Bytes>;
284
285 fn get_read(&self, start: u64) -> crate::errors::Result<Self::T> {
286 Ok(self.get(start)?.reader())
287 }
288
289 fn get_bytes(&self, start: u64, length: usize) -> crate::errors::Result<Bytes> {
290 Ok(self.get(start)?.slice(..length))
291 }
292}
293
294struct ColumnChunkIterator {
296 reader: Option<crate::errors::Result<Box<dyn PageReader>>>,
297}
298
299impl Iterator for ColumnChunkIterator {
300 type Item = crate::errors::Result<Box<dyn PageReader>>;
301
302 fn next(&mut self) -> Option<Self::Item> {
303 self.reader.take()
304 }
305}
306
307impl PageIterator for ColumnChunkIterator {}