parquet/file/page_index/
index_reader.rs1use crate::basic::Type;
21use crate::data_type::Int96;
22use crate::errors::ParquetError;
23use crate::file::metadata::ColumnChunkMetaData;
24use crate::file::page_index::index::{Index, NativeIndex};
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use crate::file::reader::ChunkReader;
27use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
28use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
29use std::ops::Range;
30
31pub(crate) fn acc_range(a: Option<Range<u64>>, b: Option<Range<u64>>) -> Option<Range<u64>> {
35 match (a, b) {
36 (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
37 (None, x) | (x, None) => x,
38 }
39}
40
41pub fn read_columns_indexes<R: ChunkReader>(
52 reader: &R,
53 chunks: &[ColumnChunkMetaData],
54) -> Result<Option<Vec<Index>>, ParquetError> {
55 let fetch = chunks
56 .iter()
57 .fold(None, |range, c| acc_range(range, c.column_index_range()));
58
59 let fetch = match fetch {
60 Some(r) => r,
61 None => return Ok(None),
62 };
63
64 let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
65
66 Some(
67 chunks
68 .iter()
69 .map(|c| match c.column_index_range() {
70 Some(r) => decode_column_index(
71 &bytes[usize::try_from(r.start - fetch.start)?
72 ..usize::try_from(r.end - fetch.start)?],
73 c.column_type(),
74 ),
75 None => Ok(Index::NONE),
76 })
77 .collect(),
78 )
79 .transpose()
80}
81
82#[deprecated(since = "53.0.0", note = "Use read_offset_indexes")]
94pub fn read_pages_locations<R: ChunkReader>(
95 reader: &R,
96 chunks: &[ColumnChunkMetaData],
97) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
98 let fetch = chunks
99 .iter()
100 .fold(None, |range, c| acc_range(range, c.offset_index_range()));
101
102 let fetch = match fetch {
103 Some(r) => r,
104 None => return Ok(vec![]),
105 };
106
107 let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
108
109 chunks
110 .iter()
111 .map(|c| match c.offset_index_range() {
112 Some(r) => decode_page_locations(
113 &bytes[usize::try_from(r.start - fetch.start)?
114 ..usize::try_from(r.end - fetch.start)?],
115 ),
116 None => Err(general_err!("missing offset index")),
117 })
118 .collect()
119}
120
121pub fn read_offset_indexes<R: ChunkReader>(
132 reader: &R,
133 chunks: &[ColumnChunkMetaData],
134) -> Result<Option<Vec<OffsetIndexMetaData>>, ParquetError> {
135 let fetch = chunks
136 .iter()
137 .fold(None, |range, c| acc_range(range, c.offset_index_range()));
138
139 let fetch = match fetch {
140 Some(r) => r,
141 None => return Ok(None),
142 };
143
144 let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?;
145
146 Some(
147 chunks
148 .iter()
149 .map(|c| match c.offset_index_range() {
150 Some(r) => decode_offset_index(
151 &bytes[usize::try_from(r.start - fetch.start)?
152 ..usize::try_from(r.end - fetch.start)?],
153 ),
154 None => Err(general_err!("missing offset index")),
155 })
156 .collect(),
157 )
158 .transpose()
159}
160
161pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
162 let mut prot = TCompactSliceInputProtocol::new(data);
163 let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
164 OffsetIndexMetaData::try_new(offset)
165}
166
167pub(crate) fn decode_page_locations(data: &[u8]) -> Result<Vec<PageLocation>, ParquetError> {
168 let mut prot = TCompactSliceInputProtocol::new(data);
169 let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
170 Ok(offset.page_locations)
171}
172
173pub(crate) fn decode_column_index(data: &[u8], column_type: Type) -> Result<Index, ParquetError> {
174 let mut prot = TCompactSliceInputProtocol::new(data);
175
176 let index = ColumnIndex::read_from_in_protocol(&mut prot)?;
177
178 let index = match column_type {
179 Type::BOOLEAN => Index::BOOLEAN(NativeIndex::<bool>::try_new(index)?),
180 Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new(index)?),
181 Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new(index)?),
182 Type::INT96 => Index::INT96(NativeIndex::<Int96>::try_new(index)?),
183 Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new(index)?),
184 Type::DOUBLE => Index::DOUBLE(NativeIndex::<f64>::try_new(index)?),
185 Type::BYTE_ARRAY => Index::BYTE_ARRAY(NativeIndex::try_new(index)?),
186 Type::FIXED_LEN_BYTE_ARRAY => Index::FIXED_LEN_BYTE_ARRAY(NativeIndex::try_new(index)?),
187 };
188
189 Ok(index)
190}