parquet/arrow/
in_memory_row_group.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::ProjectionMask;
19use crate::arrow::array_reader::RowGroups;
20use crate::arrow::arrow_reader::RowSelection;
21use crate::column::page::{PageIterator, PageReader};
22use crate::errors::ParquetError;
23use crate::file::metadata::ParquetMetaData;
24use crate::file::page_index::offset_index::OffsetIndexMetaData;
25use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
26use bytes::{Buf, Bytes};
27use std::ops::Range;
28use std::sync::Arc;
29
30/// An in-memory collection of column chunks
31#[derive(Debug)]
32pub(crate) struct InMemoryRowGroup<'a> {
33    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
34    /// Column chunks for this row group
35    pub(crate) column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
36    pub(crate) row_count: usize,
37    pub(crate) row_group_idx: usize,
38    pub(crate) metadata: &'a ParquetMetaData,
39}
40
41/// What ranges to fetch for the columns in this row group
42#[derive(Debug)]
43pub(crate) struct FetchRanges {
44    /// The byte ranges to fetch
45    pub(crate) ranges: Vec<Range<u64>>,
46    /// If `Some`, the start offsets of each page for each column chunk
47    pub(crate) page_start_offsets: Option<Vec<Vec<u64>>>,
48}
49
50impl InMemoryRowGroup<'_> {
51    /// Returns the byte ranges to fetch for the columns specified in
52    /// `projection` and `selection`.
53    ///
54    /// `cache_mask` indicates which columns, if any, are being cached by
55    /// [`RowGroupCache`](crate::arrow::array_reader::RowGroupCache).
56    /// The `selection` for Cached columns is expanded to batch boundaries to simplify
57    /// accounting for what data is cached.
58    pub(crate) fn fetch_ranges(
59        &self,
60        projection: &ProjectionMask,
61        selection: Option<&RowSelection>,
62        batch_size: usize,
63        cache_mask: Option<&ProjectionMask>,
64    ) -> FetchRanges {
65        let metadata = self.metadata.row_group(self.row_group_idx);
66        if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
67            let expanded_selection =
68                selection.expand_to_batch_boundaries(batch_size, self.row_count);
69
70            // If we have a `RowSelection` and an `OffsetIndex` then only fetch
71            // pages required for the `RowSelection`
72            // Consider preallocating outer vec: https://github.com/apache/arrow-rs/issues/8667
73            let mut page_start_offsets: Vec<Vec<u64>> = vec![];
74
75            let ranges = self
76                .column_chunks
77                .iter()
78                .zip(metadata.columns())
79                .enumerate()
80                .filter(|&(idx, (chunk, _chunk_meta))| {
81                    chunk.is_none() && projection.leaf_included(idx)
82                })
83                .flat_map(|(idx, (_chunk, chunk_meta))| {
84                    // If the first page does not start at the beginning of the column,
85                    // then we need to also fetch a dictionary page.
86                    let mut ranges: Vec<Range<u64>> = vec![];
87                    let (start, _len) = chunk_meta.byte_range();
88                    match offset_index[idx].page_locations.first() {
89                        Some(first) if first.offset as u64 != start => {
90                            ranges.push(start..first.offset as u64);
91                        }
92                        _ => (),
93                    }
94
95                    // Expand selection to batch boundaries if needed for caching
96                    // (see doc comment for this function for details on `cache_mask`)
97                    let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
98                    if use_expanded {
99                        ranges.extend(
100                            expanded_selection.scan_ranges(&offset_index[idx].page_locations),
101                        );
102                    } else {
103                        ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
104                    }
105                    page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
106
107                    ranges
108                })
109                .collect();
110            FetchRanges {
111                ranges,
112                page_start_offsets: Some(page_start_offsets),
113            }
114        } else {
115            let ranges = self
116                .column_chunks
117                .iter()
118                .enumerate()
119                .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
120                .map(|(idx, _chunk)| {
121                    let column = metadata.column(idx);
122                    let (start, length) = column.byte_range();
123                    start..(start + length)
124                })
125                .collect();
126            FetchRanges {
127                ranges,
128                page_start_offsets: None,
129            }
130        }
131    }
132
133    /// Fills in `self.column_chunks` with the data fetched from `chunk_data`.
134    ///
135    /// This function **must** be called with the data from the ranges returned by
136    /// `fetch_ranges` and the corresponding page_start_offsets, with the exact same and `selection`.
137    pub(crate) fn fill_column_chunks<I>(
138        &mut self,
139        projection: &ProjectionMask,
140        page_start_offsets: Option<Vec<Vec<u64>>>,
141        chunk_data: I,
142    ) where
143        I: IntoIterator<Item = Bytes>,
144    {
145        let mut chunk_data = chunk_data.into_iter();
146        let metadata = self.metadata.row_group(self.row_group_idx);
147        if let Some(page_start_offsets) = page_start_offsets {
148            // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
149            // `RowSelection`
150            let mut page_start_offsets = page_start_offsets.into_iter();
151
152            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
153                if chunk.is_some() || !projection.leaf_included(idx) {
154                    continue;
155                }
156
157                if let Some(offsets) = page_start_offsets.next() {
158                    let mut chunks = Vec::with_capacity(offsets.len());
159                    for _ in 0..offsets.len() {
160                        chunks.push(chunk_data.next().unwrap());
161                    }
162
163                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
164                        length: metadata.column(idx).byte_range().1 as usize,
165                        data: offsets
166                            .into_iter()
167                            .map(|x| x as usize)
168                            .zip(chunks.into_iter())
169                            .collect(),
170                    }))
171                }
172            }
173        } else {
174            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
175                if chunk.is_some() || !projection.leaf_included(idx) {
176                    continue;
177                }
178
179                if let Some(data) = chunk_data.next() {
180                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
181                        offset: metadata.column(idx).byte_range().0 as usize,
182                        data,
183                    }));
184                }
185            }
186        }
187    }
188}
189
190impl RowGroups for InMemoryRowGroup<'_> {
191    fn num_rows(&self) -> usize {
192        self.row_count
193    }
194
195    /// Return chunks for column i
196    fn column_chunks(&self, i: usize) -> crate::errors::Result<Box<dyn PageIterator>> {
197        match &self.column_chunks[i] {
198            None => Err(ParquetError::General(format!(
199                "Invalid column index {i}, column was not fetched"
200            ))),
201            Some(data) => {
202                let page_locations = self
203                    .offset_index
204                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
205                    .filter(|index| !index.is_empty())
206                    .map(|index| index[i].page_locations.clone());
207                let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
208                let page_reader = SerializedPageReader::new(
209                    data.clone(),
210                    column_chunk_metadata,
211                    self.row_count,
212                    page_locations,
213                )?;
214                let page_reader = page_reader.add_crypto_context(
215                    self.row_group_idx,
216                    i,
217                    self.metadata,
218                    column_chunk_metadata,
219                )?;
220
221                let page_reader: Box<dyn PageReader> = Box::new(page_reader);
222
223                Ok(Box::new(ColumnChunkIterator {
224                    reader: Some(Ok(page_reader)),
225                }))
226            }
227        }
228    }
229}
230
231/// An in-memory column chunk.
232/// This allows us to hold either dense column chunks or sparse column chunks and easily
233/// access them by offset.
234#[derive(Clone, Debug)]
235pub(crate) enum ColumnChunkData {
236    /// Column chunk data representing only a subset of data pages.
237    /// For example if a row selection (possibly caused by a filter in a query) causes us to read only
238    /// a subset of the rows in the column.
239    Sparse {
240        /// Length of the full column chunk
241        length: usize,
242        /// Subset of data pages included in this sparse chunk.
243        ///
244        /// Each element is a tuple of (page offset within file, page data).
245        /// Each entry is a complete page and the list is ordered by offset.
246        data: Vec<(usize, Bytes)>,
247    },
248    /// Full column chunk and the offset within the original file
249    Dense { offset: usize, data: Bytes },
250}
251
252impl ColumnChunkData {
253    /// Return the data for this column chunk at the given offset
254    fn get(&self, start: u64) -> crate::errors::Result<Bytes> {
255        match &self {
256            ColumnChunkData::Sparse { data, .. } => data
257                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
258                .map(|idx| data[idx].1.clone())
259                .map_err(|_| {
260                    ParquetError::General(format!(
261                        "Invalid offset in sparse column chunk data: {start}"
262                    ))
263                }),
264            ColumnChunkData::Dense { offset, data } => {
265                let start = start as usize - *offset;
266                Ok(data.slice(start..))
267            }
268        }
269    }
270}
271
272impl Length for ColumnChunkData {
273    /// Return the total length of the full column chunk
274    fn len(&self) -> u64 {
275        match &self {
276            ColumnChunkData::Sparse { length, .. } => *length as u64,
277            ColumnChunkData::Dense { data, .. } => data.len() as u64,
278        }
279    }
280}
281
282impl ChunkReader for ColumnChunkData {
283    type T = bytes::buf::Reader<Bytes>;
284
285    fn get_read(&self, start: u64) -> crate::errors::Result<Self::T> {
286        Ok(self.get(start)?.reader())
287    }
288
289    fn get_bytes(&self, start: u64, length: usize) -> crate::errors::Result<Bytes> {
290        Ok(self.get(start)?.slice(..length))
291    }
292}
293
294/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
295struct ColumnChunkIterator {
296    reader: Option<crate::errors::Result<Box<dyn PageReader>>>,
297}
298
299impl Iterator for ColumnChunkIterator {
300    type Item = crate::errors::Result<Box<dyn PageReader>>;
301
302    fn next(&mut self) -> Option<Self::Item> {
303        self.reader.take()
304    }
305}
306
307impl PageIterator for ColumnChunkIterator {}