parquet/file/page_index/
index_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Support for reading [`Index`] and [`OffsetIndex`] from parquet metadata.
19
20use crate::basic::Type;
21use crate::data_type::Int96;
22use crate::errors::ParquetError;
23use crate::file::metadata::ColumnChunkMetaData;
24use crate::file::page_index::index::{Index, NativeIndex};
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use crate::file::reader::ChunkReader;
27use crate::format::{ColumnIndex, OffsetIndex};
28use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
29use std::ops::Range;
30
31/// Computes the covering range of two optional ranges
32///
33/// For example `acc_range(Some(7..9), Some(1..3)) = Some(1..9)`
34pub(crate) fn acc_range(a: Option<Range<u64>>, b: Option<Range<u64>>) -> Option<Range<u64>> {
35    match (a, b) {
36        (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
37        (None, x) | (x, None) => x,
38    }
39}
40
41/// Reads per-column [`Index`] for all columns of a row group by
42/// decoding [`ColumnIndex`] .
43///
44/// Returns a vector of `index[column_number]`.
45///
46/// Returns `None` if this row group does not contain a [`ColumnIndex`].
47///
48/// See [Page Index Documentation] for more details.
49///
50/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
51#[deprecated(
52    since = "55.2.0",
53    note = "Use ParquetMetaDataReader instead; will be removed in 58.0.0"
54)]
55pub fn read_columns_indexes<R: ChunkReader>(
56    reader: &R,
57    chunks: &[ColumnChunkMetaData],
58) -> Result<Option<Vec<Index>>, ParquetError> {
59    let fetch = chunks
60        .iter()
61        .fold(None, |range, c| acc_range(range, c.column_index_range()));
62
63    let fetch = match fetch {
64        Some(r) => r,
65        None => return Ok(None),
66    };
67
68    let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
69
70    Some(
71        chunks
72            .iter()
73            .map(|c| match c.column_index_range() {
74                Some(r) => decode_column_index(
75                    &bytes[usize::try_from(r.start - fetch.start)?
76                        ..usize::try_from(r.end - fetch.start)?],
77                    c.column_type(),
78                ),
79                None => Ok(Index::NONE),
80            })
81            .collect(),
82    )
83    .transpose()
84}
85
86/// Reads per-column [`OffsetIndexMetaData`] for all columns of a row group by
87/// decoding [`OffsetIndex`] .
88///
89/// Returns a vector of `offset_index[column_number]`.
90///
91/// Returns `None` if this row group does not contain an [`OffsetIndex`].
92///
93/// See [Page Index Documentation] for more details.
94///
95/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
96#[deprecated(
97    since = "55.2.0",
98    note = "Use ParquetMetaDataReader instead; will be removed in 58.0.0"
99)]
100pub fn read_offset_indexes<R: ChunkReader>(
101    reader: &R,
102    chunks: &[ColumnChunkMetaData],
103) -> Result<Option<Vec<OffsetIndexMetaData>>, ParquetError> {
104    let fetch = chunks
105        .iter()
106        .fold(None, |range, c| acc_range(range, c.offset_index_range()));
107
108    let fetch = match fetch {
109        Some(r) => r,
110        None => return Ok(None),
111    };
112
113    let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
114
115    Some(
116        chunks
117            .iter()
118            .map(|c| match c.offset_index_range() {
119                Some(r) => decode_offset_index(
120                    &bytes[usize::try_from(r.start - fetch.start)?
121                        ..usize::try_from(r.end - fetch.start)?],
122                ),
123                None => Err(general_err!("missing offset index")),
124            })
125            .collect(),
126    )
127    .transpose()
128}
129
130pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
131    let mut prot = TCompactSliceInputProtocol::new(data);
132    let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
133    OffsetIndexMetaData::try_new(offset)
134}
135
136pub(crate) fn decode_column_index(data: &[u8], column_type: Type) -> Result<Index, ParquetError> {
137    let mut prot = TCompactSliceInputProtocol::new(data);
138
139    let index = ColumnIndex::read_from_in_protocol(&mut prot)?;
140
141    let index = match column_type {
142        Type::BOOLEAN => Index::BOOLEAN(NativeIndex::<bool>::try_new(index)?),
143        Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new(index)?),
144        Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new(index)?),
145        Type::INT96 => Index::INT96(NativeIndex::<Int96>::try_new(index)?),
146        Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new(index)?),
147        Type::DOUBLE => Index::DOUBLE(NativeIndex::<f64>::try_new(index)?),
148        Type::BYTE_ARRAY => Index::BYTE_ARRAY(NativeIndex::try_new(index)?),
149        Type::FIXED_LEN_BYTE_ARRAY => Index::FIXED_LEN_BYTE_ARRAY(NativeIndex::try_new(index)?),
150    };
151
152    Ok(index)
153}