parquet/file/page_index/
index_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Support for reading [`ColumnIndexMetaData`] and [`OffsetIndexMetaData`] from parquet metadata.
19
20use crate::basic::{BoundaryOrder, Type};
21use crate::data_type::Int96;
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::ColumnChunkMetaData;
24use crate::file::page_index::column_index::{
25    ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
26};
27use crate::file::page_index::offset_index::OffsetIndexMetaData;
28use crate::file::reader::ChunkReader;
29use crate::parquet_thrift::{
30    read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
31    ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
32};
33use crate::thrift_struct;
34use std::io::Write;
35use std::ops::Range;
36
37/// Computes the covering range of two optional ranges
38///
39/// For example `acc_range(Some(7..9), Some(1..3)) = Some(1..9)`
40pub(crate) fn acc_range(a: Option<Range<u64>>, b: Option<Range<u64>>) -> Option<Range<u64>> {
41    match (a, b) {
42        (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
43        (None, x) | (x, None) => x,
44    }
45}
46
47/// Reads per-column [`ColumnIndexMetaData`] for all columns of a row group by
48/// decoding [`ColumnIndex`] .
49///
50/// Returns a vector of `index[column_number]`.
51///
52/// Returns `None` if this row group does not contain a [`ColumnIndex`].
53///
54/// See [Page Index Documentation] for more details.
55///
56/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
57/// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
58#[deprecated(
59    since = "55.2.0",
60    note = "Use ParquetMetaDataReader instead; will be removed in 58.0.0"
61)]
62pub fn read_columns_indexes<R: ChunkReader>(
63    reader: &R,
64    chunks: &[ColumnChunkMetaData],
65) -> Result<Option<Vec<ColumnIndexMetaData>>, ParquetError> {
66    let fetch = chunks
67        .iter()
68        .fold(None, |range, c| acc_range(range, c.column_index_range()));
69
70    let fetch = match fetch {
71        Some(r) => r,
72        None => return Ok(None),
73    };
74
75    let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
76
77    Some(
78        chunks
79            .iter()
80            .map(|c| match c.column_index_range() {
81                Some(r) => decode_column_index(
82                    &bytes[usize::try_from(r.start - fetch.start)?
83                        ..usize::try_from(r.end - fetch.start)?],
84                    c.column_type(),
85                ),
86                None => Ok(ColumnIndexMetaData::NONE),
87            })
88            .collect(),
89    )
90    .transpose()
91}
92
93/// Reads per-column [`OffsetIndexMetaData`] for all columns of a row group by
94/// decoding [`OffsetIndex`] .
95///
96/// Returns a vector of `offset_index[column_number]`.
97///
98/// Returns `None` if this row group does not contain an [`OffsetIndex`].
99///
100/// See [Page Index Documentation] for more details.
101///
102/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
103/// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
104#[deprecated(
105    since = "55.2.0",
106    note = "Use ParquetMetaDataReader instead; will be removed in 58.0.0"
107)]
108pub fn read_offset_indexes<R: ChunkReader>(
109    reader: &R,
110    chunks: &[ColumnChunkMetaData],
111) -> Result<Option<Vec<OffsetIndexMetaData>>, ParquetError> {
112    let fetch = chunks
113        .iter()
114        .fold(None, |range, c| acc_range(range, c.offset_index_range()));
115
116    let fetch = match fetch {
117        Some(r) => r,
118        None => return Ok(None),
119    };
120
121    let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
122
123    Some(
124        chunks
125            .iter()
126            .map(|c| match c.offset_index_range() {
127                Some(r) => decode_offset_index(
128                    &bytes[usize::try_from(r.start - fetch.start)?
129                        ..usize::try_from(r.end - fetch.start)?],
130                ),
131                None => Err(general_err!("missing offset index")),
132            })
133            .collect(),
134    )
135    .transpose()
136}
137
138pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
139    let mut prot = ThriftSliceInputProtocol::new(data);
140
141    // Try to read fast-path first. If that fails, fall back to slower but more robust
142    // decoder.
143    match OffsetIndexMetaData::try_from_fast(&mut prot) {
144        Ok(offset_index) => Ok(offset_index),
145        Err(_) => {
146            prot = ThriftSliceInputProtocol::new(data);
147            OffsetIndexMetaData::read_thrift(&mut prot)
148        }
149    }
150}
151
152// private struct only used for decoding then discarded
153thrift_struct!(
154pub(super) struct ThriftColumnIndex<'a> {
155  1: required list<bool> null_pages
156  2: required list<'a><binary> min_values
157  3: required list<'a><binary> max_values
158  4: required BoundaryOrder boundary_order
159  5: optional list<i64> null_counts
160  6: optional list<i64> repetition_level_histograms;
161  7: optional list<i64> definition_level_histograms;
162}
163);
164
165pub(crate) fn decode_column_index(
166    data: &[u8],
167    column_type: Type,
168) -> Result<ColumnIndexMetaData, ParquetError> {
169    let mut prot = ThriftSliceInputProtocol::new(data);
170    let index = ThriftColumnIndex::read_thrift(&mut prot)?;
171
172    let index = match column_type {
173        Type::BOOLEAN => {
174            ColumnIndexMetaData::BOOLEAN(PrimitiveColumnIndex::<bool>::try_from_thrift(index)?)
175        }
176        Type::INT32 => {
177            ColumnIndexMetaData::INT32(PrimitiveColumnIndex::<i32>::try_from_thrift(index)?)
178        }
179        Type::INT64 => {
180            ColumnIndexMetaData::INT64(PrimitiveColumnIndex::<i64>::try_from_thrift(index)?)
181        }
182        Type::INT96 => {
183            ColumnIndexMetaData::INT96(PrimitiveColumnIndex::<Int96>::try_from_thrift(index)?)
184        }
185        Type::FLOAT => {
186            ColumnIndexMetaData::FLOAT(PrimitiveColumnIndex::<f32>::try_from_thrift(index)?)
187        }
188        Type::DOUBLE => {
189            ColumnIndexMetaData::DOUBLE(PrimitiveColumnIndex::<f64>::try_from_thrift(index)?)
190        }
191        Type::BYTE_ARRAY => {
192            ColumnIndexMetaData::BYTE_ARRAY(ByteArrayColumnIndex::try_from_thrift(index)?)
193        }
194        Type::FIXED_LEN_BYTE_ARRAY => {
195            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(ByteArrayColumnIndex::try_from_thrift(index)?)
196        }
197    };
198
199    Ok(index)
200}