parquet/file/page_index/
index_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Support for reading [`Index`] and [`PageLocation`] from parquet metadata.
19
20use crate::basic::Type;
21use crate::data_type::Int96;
22use crate::errors::ParquetError;
23use crate::file::metadata::ColumnChunkMetaData;
24use crate::file::page_index::index::{Index, NativeIndex};
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use crate::file::reader::ChunkReader;
27use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
28use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
29use std::ops::Range;
30
31/// Computes the covering range of two optional ranges
32///
33/// For example `acc_range(Some(7..9), Some(1..3)) = Some(1..9)`
34pub(crate) fn acc_range(a: Option<Range<u64>>, b: Option<Range<u64>>) -> Option<Range<u64>> {
35    match (a, b) {
36        (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
37        (None, x) | (x, None) => x,
38    }
39}
40
41/// Reads per-column [`Index`] for all columns of a row group by
42/// decoding [`ColumnIndex`] .
43///
44/// Returns a vector of `index[column_number]`.
45///
46/// Returns `None` if this row group does not contain a [`ColumnIndex`].
47///
48/// See [Page Index Documentation] for more details.
49///
50/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
51pub fn read_columns_indexes<R: ChunkReader>(
52    reader: &R,
53    chunks: &[ColumnChunkMetaData],
54) -> Result<Option<Vec<Index>>, ParquetError> {
55    let fetch = chunks
56        .iter()
57        .fold(None, |range, c| acc_range(range, c.column_index_range()));
58
59    let fetch = match fetch {
60        Some(r) => r,
61        None => return Ok(None),
62    };
63
64    let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
65
66    Some(
67        chunks
68            .iter()
69            .map(|c| match c.column_index_range() {
70                Some(r) => decode_column_index(
71                    &bytes[usize::try_from(r.start - fetch.start)?
72                        ..usize::try_from(r.end - fetch.start)?],
73                    c.column_type(),
74                ),
75                None => Ok(Index::NONE),
76            })
77            .collect(),
78    )
79    .transpose()
80}
81
82/// Reads [`OffsetIndex`],  per-page [`PageLocation`] for all columns of a row
83/// group.
84///
85/// Returns a vector of `location[column_number][page_number]`
86///
87/// Return an empty vector if this row group does not contain an
88/// [`OffsetIndex]`.
89///
90/// See [Page Index Documentation] for more details.
91///
92/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
93#[deprecated(since = "53.0.0", note = "Use read_offset_indexes")]
94pub fn read_pages_locations<R: ChunkReader>(
95    reader: &R,
96    chunks: &[ColumnChunkMetaData],
97) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
98    let fetch = chunks
99        .iter()
100        .fold(None, |range, c| acc_range(range, c.offset_index_range()));
101
102    let fetch = match fetch {
103        Some(r) => r,
104        None => return Ok(vec![]),
105    };
106
107    let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
108
109    chunks
110        .iter()
111        .map(|c| match c.offset_index_range() {
112            Some(r) => decode_page_locations(
113                &bytes[usize::try_from(r.start - fetch.start)?
114                    ..usize::try_from(r.end - fetch.start)?],
115            ),
116            None => Err(general_err!("missing offset index")),
117        })
118        .collect()
119}
120
121/// Reads per-column [`OffsetIndexMetaData`] for all columns of a row group by
122/// decoding [`OffsetIndex`] .
123///
124/// Returns a vector of `offset_index[column_number]`.
125///
126/// Returns `None` if this row group does not contain an [`OffsetIndex`].
127///
128/// See [Page Index Documentation] for more details.
129///
130/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
131pub fn read_offset_indexes<R: ChunkReader>(
132    reader: &R,
133    chunks: &[ColumnChunkMetaData],
134) -> Result<Option<Vec<OffsetIndexMetaData>>, ParquetError> {
135    let fetch = chunks
136        .iter()
137        .fold(None, |range, c| acc_range(range, c.offset_index_range()));
138
139    let fetch = match fetch {
140        Some(r) => r,
141        None => return Ok(None),
142    };
143
144    let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
145
146    Some(
147        chunks
148            .iter()
149            .map(|c| match c.offset_index_range() {
150                Some(r) => decode_offset_index(
151                    &bytes[usize::try_from(r.start - fetch.start)?
152                        ..usize::try_from(r.end - fetch.start)?],
153                ),
154                None => Err(general_err!("missing offset index")),
155            })
156            .collect(),
157    )
158    .transpose()
159}
160
161pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
162    let mut prot = TCompactSliceInputProtocol::new(data);
163    let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
164    OffsetIndexMetaData::try_new(offset)
165}
166
167pub(crate) fn decode_page_locations(data: &[u8]) -> Result<Vec<PageLocation>, ParquetError> {
168    let mut prot = TCompactSliceInputProtocol::new(data);
169    let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
170    Ok(offset.page_locations)
171}
172
173pub(crate) fn decode_column_index(data: &[u8], column_type: Type) -> Result<Index, ParquetError> {
174    let mut prot = TCompactSliceInputProtocol::new(data);
175
176    let index = ColumnIndex::read_from_in_protocol(&mut prot)?;
177
178    let index = match column_type {
179        Type::BOOLEAN => Index::BOOLEAN(NativeIndex::<bool>::try_new(index)?),
180        Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new(index)?),
181        Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new(index)?),
182        Type::INT96 => Index::INT96(NativeIndex::<Int96>::try_new(index)?),
183        Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new(index)?),
184        Type::DOUBLE => Index::DOUBLE(NativeIndex::<f64>::try_new(index)?),
185        Type::BYTE_ARRAY => Index::BYTE_ARRAY(NativeIndex::try_new(index)?),
186        Type::FIXED_LEN_BYTE_ARRAY => Index::FIXED_LEN_BYTE_ARRAY(NativeIndex::try_new(index)?),
187    };
188
189    Ok(index)
190}